1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
22 - LogSummary contains last N entries for every channel
23 - LogSummary (as "full") written on every commit
24 - LogSummary contains "keys" which LogEntryKey hash_set for the
25 same set of entries (for deduping)
29 - LogSummary contains, for each channel,
31 - end seq (last written seq + 1)
32 - LogSummary contains an LRUSet for tracking dups
33 - LogSummary written every N commits
34 - each LogEntry written in a separate key
35 - "%s/%08x" % (channel, seq) -> LogEntry
36 - per-commit record includes channel -> begin (trim bounds)
37 - 'external_log_to' meta records version to which we have logged externally
43 #include <boost/algorithm/string/predicate.hpp>
48 #include "LogMonitor.h"
50 #include "MonitorDBStore.h"
52 #include "messages/MMonCommand.h"
53 #include "messages/MLog.h"
54 #include "messages/MLogAck.h"
55 #include "common/Graylog.h"
56 #include "common/Journald.h"
57 #include "common/errno.h"
58 #include "common/strtol.h"
59 #include "include/ceph_assert.h"
60 #include "include/str_list.h"
61 #include "include/str_map.h"
62 #include "include/compat.h"
64 #define dout_subsys ceph_subsys_mon
66 using namespace TOPNSPC::common
;
77 using std::ostringstream
;
82 using std::stringstream
;
85 using std::unique_ptr
;
87 using ceph::bufferlist
;
90 using ceph::Formatter
;
91 using ceph::JSONFormatter
;
92 using ceph::make_message
;
93 using ceph::mono_clock
;
94 using ceph::mono_time
;
95 using ceph::timespan_str
;
97 string
LogMonitor::log_channel_info::get_log_file(const string
&channel
)
99 dout(25) << __func__
<< " for channel '"
100 << channel
<< "'" << dendl
;
102 if (expanded_log_file
.count(channel
) == 0) {
103 string fname
= expand_channel_meta(
104 get_str_map_key(log_file
, channel
, &CLOG_CONFIG_DEFAULT_KEY
),
106 expanded_log_file
[channel
] = fname
;
108 dout(20) << __func__
<< " for channel '"
109 << channel
<< "' expanded to '"
110 << fname
<< "'" << dendl
;
112 return expanded_log_file
[channel
];
116 void LogMonitor::log_channel_info::expand_channel_meta(map
<string
,string
> &m
)
118 dout(20) << __func__
<< " expand map: " << m
<< dendl
;
119 for (map
<string
,string
>::iterator p
= m
.begin(); p
!= m
.end(); ++p
) {
120 m
[p
->first
] = expand_channel_meta(p
->second
, p
->first
);
122 dout(20) << __func__
<< " expanded map: " << m
<< dendl
;
125 string
LogMonitor::log_channel_info::expand_channel_meta(
127 const string
&change_to
)
129 size_t pos
= string::npos
;
131 while ((pos
= s
.find(LOG_META_CHANNEL
)) != string::npos
) {
132 string tmp
= s
.substr(0, pos
) + change_to
;
133 if (pos
+LOG_META_CHANNEL
.length() < s
.length())
134 tmp
+= s
.substr(pos
+LOG_META_CHANNEL
.length());
137 dout(20) << __func__
<< " from '" << input
138 << "' to '" << s
<< "'" << dendl
;
143 bool LogMonitor::log_channel_info::do_log_to_syslog(const string
&channel
) {
144 string v
= get_str_map_key(log_to_syslog
, channel
,
145 &CLOG_CONFIG_DEFAULT_KEY
);
146 // We expect booleans, but they are in k/v pairs, kept
147 // as strings, in 'log_to_syslog'. We must ensure
148 // compatibility with existing boolean handling, and so
149 // we are here using a modified version of how
150 // md_config_t::set_val_raw() handles booleans. We will
151 // accept both 'true' and 'false', but will also check for
152 // '1' and '0'. The main distiction between this and the
153 // original code is that we will assume everything not '1',
154 // '0', 'true' or 'false' to be 'false'.
157 if (boost::iequals(v
, "false")) {
159 } else if (boost::iequals(v
, "true")) {
163 int b
= strict_strtol(v
.c_str(), 10, &err
);
164 ret
= (err
.empty() && b
== 1);
170 ceph::logging::Graylog::Ref
LogMonitor::log_channel_info::get_graylog(
171 const string
&channel
)
173 dout(25) << __func__
<< " for channel '"
174 << channel
<< "'" << dendl
;
176 if (graylogs
.count(channel
) == 0) {
177 auto graylog(std::make_shared
<ceph::logging::Graylog
>("mon"));
179 graylog
->set_fsid(g_conf().get_val
<uuid_d
>("fsid"));
180 graylog
->set_hostname(g_conf()->host
);
181 graylog
->set_destination(get_str_map_key(log_to_graylog_host
, channel
,
182 &CLOG_CONFIG_DEFAULT_KEY
),
183 atoi(get_str_map_key(log_to_graylog_port
, channel
,
184 &CLOG_CONFIG_DEFAULT_KEY
).c_str()));
186 graylogs
[channel
] = graylog
;
187 dout(20) << __func__
<< " for channel '"
188 << channel
<< "' to graylog host '"
189 << log_to_graylog_host
[channel
] << ":"
190 << log_to_graylog_port
[channel
]
193 return graylogs
[channel
];
196 ceph::logging::JournaldClusterLogger
&LogMonitor::log_channel_info::get_journald()
198 dout(25) << __func__
<< dendl
;
201 journald
= std::make_unique
<ceph::logging::JournaldClusterLogger
>();
206 void LogMonitor::log_channel_info::clear()
208 log_to_syslog
.clear();
209 syslog_level
.clear();
210 syslog_facility
.clear();
212 expanded_log_file
.clear();
213 log_file_level
.clear();
214 log_to_graylog
.clear();
215 log_to_graylog_host
.clear();
216 log_to_graylog_port
.clear();
217 log_to_journald
.clear();
222 LogMonitor::log_channel_info::log_channel_info() = default;
223 LogMonitor::log_channel_info::~log_channel_info() = default;
227 #define dout_prefix _prefix(_dout, mon, get_last_committed())
228 static ostream
& _prefix(std::ostream
*_dout
, Monitor
&mon
, version_t v
) {
229 return *_dout
<< "mon." << mon
.name
<< "@" << mon
.rank
230 << "(" << mon
.get_state_name()
231 << ").log v" << v
<< " ";
234 ostream
& operator<<(ostream
&out
, const LogMonitor
&pm
)
240 Tick function to update the map based on performance every N seconds
243 void LogMonitor::tick()
245 if (!is_active()) return;
247 dout(10) << *this << dendl
;
251 void LogMonitor::create_initial()
253 dout(10) << "create_initial -- creating initial map" << dendl
;
255 e
.name
= g_conf()->name
;
256 e
.rank
= entity_name_t::MON(mon
.rank
);
257 e
.addrs
= mon
.messenger
->get_myaddrs();
258 e
.stamp
= ceph_clock_now();
260 e
.channel
= CLOG_CHANNEL_CLUSTER
;
261 std::stringstream ss
;
262 ss
<< "mkfs " << mon
.monmap
->get_fsid();
265 pending_log
.insert(pair
<utime_t
,LogEntry
>(e
.stamp
, e
));
268 void LogMonitor::update_from_paxos(bool *need_bootstrap
)
270 dout(10) << __func__
<< dendl
;
271 version_t version
= get_last_committed();
272 dout(10) << __func__
<< " version " << version
273 << " summary v " << summary
.version
<< dendl
;
275 log_external_backlog();
277 if (version
== summary
.version
)
279 ceph_assert(version
>= summary
.version
);
281 version_t latest_full
= get_version_latest_full();
282 dout(10) << __func__
<< " latest full " << latest_full
<< dendl
;
283 if ((latest_full
> 0) && (latest_full
> summary
.version
)) {
284 bufferlist latest_bl
;
285 get_version_full(latest_full
, latest_bl
);
286 ceph_assert(latest_bl
.length() != 0);
287 dout(7) << __func__
<< " loading summary e" << latest_full
<< dendl
;
288 auto p
= latest_bl
.cbegin();
290 dout(7) << __func__
<< " loaded summary e" << summary
.version
<< dendl
;
293 // walk through incrementals
294 while (version
> summary
.version
) {
296 int err
= get_version(summary
.version
+1, bl
);
297 ceph_assert(err
== 0);
298 ceph_assert(bl
.length());
300 auto p
= bl
.cbegin();
304 // legacy pre-quincy commits
308 dout(7) << "update_from_paxos applying incremental log "
309 << summary
.version
+1 << " " << le
<< dendl
;
310 summary
.add_legacy(le
);
318 dout(7) << "update_from_paxos applying incremental log "
319 << summary
.version
+1 << " " << le
<< dendl
;
320 summary
.recent_keys
.insert(le
.key());
321 summary
.channel_info
[le
.channel
].second
++;
322 // we may have logged past the (persisted) summary in a prior quorum
323 if (version
> external_log_to
) {
327 map
<string
,version_t
> prune_channels_to
;
328 decode(prune_channels_to
, p
);
329 for (auto& [channel
, prune_to
] : prune_channels_to
) {
330 dout(20) << __func__
<< " channel " << channel
331 << " pruned to " << prune_to
<< dendl
;
332 summary
.channel_info
[channel
].first
= prune_to
;
334 // zero out pre-quincy fields (encode_pending needs this to reliably detect
336 summary
.tail_by_channel
.clear();
337 summary
.keys
.clear();
341 summary
.prune(g_conf()->mon_log_max_summary
);
343 dout(10) << " summary.channel_info " << summary
.channel_info
<< dendl
;
344 external_log_to
= version
;
345 mon
.store
->write_meta("external_log_to", stringify(external_log_to
));
350 void LogMonitor::log_external(const LogEntry
& le
)
352 string channel
= le
.channel
;
353 if (channel
.empty()) { // keep retrocompatibility
354 channel
= CLOG_CHANNEL_CLUSTER
;
357 if (g_conf().get_val
<bool>("mon_cluster_log_to_stderr")) {
358 cerr
<< channel
<< " " << le
<< std::endl
;
361 if (channels
.do_log_to_syslog(channel
)) {
362 string level
= channels
.get_level(channel
);
363 string facility
= channels
.get_facility(channel
);
364 if (level
.empty() || facility
.empty()) {
365 derr
<< __func__
<< " unable to log to syslog -- level or facility"
366 << " not defined (level: " << level
<< ", facility: "
367 << facility
<< ")" << dendl
;
369 le
.log_to_syslog(channels
.get_level(channel
),
370 channels
.get_facility(channel
));
374 if (channels
.do_log_to_graylog(channel
)) {
375 ceph::logging::Graylog::Ref graylog
= channels
.get_graylog(channel
);
377 graylog
->log_log_entry(&le
);
379 dout(7) << "graylog: " << channel
<< " " << graylog
380 << " host:" << channels
.log_to_graylog_host
<< dendl
;
383 if (channels
.do_log_to_journald(channel
)) {
384 auto &journald
= channels
.get_journald();
385 journald
.log_log_entry(le
);
386 dout(7) << "journald: " << channel
<< dendl
;
389 if (g_conf()->mon_cluster_log_to_file
) {
390 auto p
= channel_fds
.find(channel
);
392 if (p
== channel_fds
.end()) {
393 string log_file
= channels
.get_log_file(channel
);
394 dout(20) << __func__
<< " logging for channel '" << channel
395 << "' to file '" << log_file
<< "'" << dendl
;
396 if (log_file
.empty()) {
397 // do not log this channel
400 fd
= ::open(log_file
.c_str(), O_WRONLY
|O_APPEND
|O_CREAT
|O_CLOEXEC
, 0600);
403 dout(1) << "unable to write to '" << log_file
<< "' for channel '"
404 << channel
<< "': " << cpp_strerror(err
) << dendl
;
406 channel_fds
[channel
] = fd
;
414 fmt::format_to(file_log_buffer
, "{}\n", le
);
415 int err
= safe_write(fd
, file_log_buffer
.data(), file_log_buffer
.size());
416 file_log_buffer
.clear();
418 dout(1) << "error writing to '" << channels
.get_log_file(channel
)
419 << "' for channel '" << channel
420 << ": " << cpp_strerror(err
) << dendl
;
422 channel_fds
.erase(channel
);
428 void LogMonitor::log_external_close_fds()
430 for (auto& [channel
, fd
] : channel_fds
) {
432 dout(10) << __func__
<< " closing " << channel
<< " (" << fd
<< ")" << dendl
;
439 /// catch external logs up to summary.version
440 void LogMonitor::log_external_backlog()
442 if (!external_log_to
) {
444 int r
= mon
.store
->read_meta("external_log_to", &cur_str
);
446 external_log_to
= std::stoull(cur_str
);
447 dout(10) << __func__
<< " initialized external_log_to = " << external_log_to
448 << " (recorded log_to position)" << dendl
;
450 // pre-quincy, we assumed that anything through summary.version was
451 // logged externally.
452 assert(r
== -ENOENT
);
453 external_log_to
= summary
.version
;
454 dout(10) << __func__
<< " initialized external_log_to = " << external_log_to
455 << " (summary v " << summary
.version
<< ")" << dendl
;
458 // we may have logged ahead of summary.version, but never ahead of paxos
459 if (external_log_to
> get_last_committed()) {
460 derr
<< __func__
<< " rewinding external_log_to from " << external_log_to
461 << " -> " << get_last_committed() << " (sync_force? mon rebuild?)" << dendl
;
462 external_log_to
= get_last_committed();
464 if (external_log_to
>= summary
.version
) {
467 if (auto first
= get_first_committed(); external_log_to
< first
) {
468 derr
<< __func__
<< " local logs at " << external_log_to
469 << ", skipping to " << first
<< dendl
;
470 external_log_to
= first
;
471 // FIXME: write marker in each channel log file?
473 for (; external_log_to
< summary
.version
; ++external_log_to
) {
475 int err
= get_version(external_log_to
+1, bl
);
476 ceph_assert(err
== 0);
477 ceph_assert(bl
.length());
478 auto p
= bl
.cbegin();
485 while ((num
== -2 && !p
.end()) || (num
>= 0 && num
--)) {
491 mon
.store
->write_meta("external_log_to", stringify(external_log_to
));
494 void LogMonitor::create_pending()
497 pending_keys
.clear();
498 dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl
;
501 void LogMonitor::generate_logentry_key(
502 const std::string
& channel
,
506 out
->append(channel
);
509 snprintf(vs
, sizeof(vs
), "%08llx", (unsigned long long)v
);
513 void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t
)
515 version_t version
= get_last_committed() + 1;
517 dout(10) << __func__
<< " v" << version
<< dendl
;
519 if (mon
.monmap
->min_mon_release
< ceph_release_t::quincy
) {
520 // legacy encoding for pre-quincy quorum
522 encode(struct_v
, bl
);
523 for (auto& p
: pending_log
) {
524 p
.second
.encode(bl
, mon
.get_quorum_con_features());
526 put_version(t
, version
, bl
);
527 put_last_committed(t
, version
);
532 encode(struct_v
, bl
);
534 // first commit after upgrading to quincy?
535 if (!summary
.tail_by_channel
.empty()) {
536 // include past log entries
537 for (auto& p
: summary
.tail_by_channel
) {
538 for (auto& q
: p
.second
) {
539 pending_log
.emplace(make_pair(q
.second
.stamp
, q
.second
));
544 // record new entries
545 auto pending_channel_info
= summary
.channel_info
;
546 uint32_t num
= pending_log
.size();
548 dout(20) << __func__
<< " writing " << num
<< " entries" << dendl
;
549 for (auto& p
: pending_log
) {
551 p
.second
.encode(ebl
, mon
.get_quorum_con_features());
553 auto& bounds
= pending_channel_info
[p
.second
.channel
];
554 version_t v
= bounds
.second
++;
556 generate_logentry_key(p
.second
.channel
, v
, &key
);
557 t
->put(get_service_name(), key
, ebl
);
559 bl
.claim_append(ebl
);
562 // prune log entries?
563 map
<string
,version_t
> prune_channels_to
;
564 for (auto& [channel
, info
] : summary
.channel_info
) {
565 if (info
.second
- info
.first
> g_conf()->mon_log_max
) {
566 const version_t from
= info
.first
;
567 const version_t to
= info
.second
- g_conf()->mon_log_max
;
568 dout(10) << __func__
<< " pruning channel " << channel
569 << " " << from
<< " -> " << to
<< dendl
;
570 prune_channels_to
[channel
] = to
;
571 pending_channel_info
[channel
].first
= to
;
572 for (version_t v
= from
; v
< to
; ++v
) {
574 generate_logentry_key(channel
, v
, &key
);
575 t
->erase(get_service_name(), key
);
579 dout(20) << __func__
<< " prune_channels_to " << prune_channels_to
<< dendl
;
580 encode(prune_channels_to
, bl
);
582 put_version(t
, version
, bl
);
583 put_last_committed(t
, version
);
586 bool LogMonitor::should_stash_full()
588 if (mon
.monmap
->min_mon_release
< ceph_release_t::quincy
) {
589 // commit a LogSummary on every commit
593 // store periodic summary
594 auto period
= std::min
<uint64_t>(
595 g_conf()->mon_log_full_interval
,
596 g_conf()->mon_max_log_epochs
598 return (get_last_committed() - get_version_latest_full() > period
);
602 void LogMonitor::encode_full(MonitorDBStore::TransactionRef t
)
604 dout(10) << __func__
<< " log v " << summary
.version
<< dendl
;
605 ceph_assert(get_last_committed() == summary
.version
);
607 bufferlist summary_bl
;
608 encode(summary
, summary_bl
, mon
.get_quorum_con_features());
610 put_version_full(t
, summary
.version
, summary_bl
);
611 put_version_latest_full(t
, summary
.version
);
614 version_t
LogMonitor::get_trim_to() const
616 if (!mon
.is_leader())
619 unsigned max
= g_conf()->mon_max_log_epochs
;
620 version_t version
= get_last_committed();
622 return version
- max
;
626 bool LogMonitor::preprocess_query(MonOpRequestRef op
)
628 op
->mark_logmon_event("preprocess_query");
629 auto m
= op
->get_req
<PaxosServiceMessage
>();
630 dout(10) << "preprocess_query " << *m
<< " from " << m
->get_orig_source_inst() << dendl
;
631 switch (m
->get_type()) {
632 case MSG_MON_COMMAND
:
634 return preprocess_command(op
);
635 } catch (const bad_cmd_get
& e
) {
637 mon
.reply_command(op
, -EINVAL
, e
.what(), bl
, get_last_committed());
642 return preprocess_log(op
);
650 bool LogMonitor::prepare_update(MonOpRequestRef op
)
652 op
->mark_logmon_event("prepare_update");
653 auto m
= op
->get_req
<PaxosServiceMessage
>();
654 dout(10) << "prepare_update " << *m
<< " from " << m
->get_orig_source_inst() << dendl
;
655 switch (m
->get_type()) {
656 case MSG_MON_COMMAND
:
658 return prepare_command(op
);
659 } catch (const bad_cmd_get
& e
) {
661 mon
.reply_command(op
, -EINVAL
, e
.what(), bl
, get_last_committed());
665 return prepare_log(op
);
672 bool LogMonitor::preprocess_log(MonOpRequestRef op
)
674 op
->mark_logmon_event("preprocess_log");
675 auto m
= op
->get_req
<MLog
>();
676 dout(10) << "preprocess_log " << *m
<< " from " << m
->get_orig_source() << dendl
;
679 MonSession
*session
= op
->get_session();
682 if (!session
->is_capable("log", MON_CAP_W
)) {
683 dout(0) << "preprocess_log got MLog from entity with insufficient privileges "
684 << session
->caps
<< dendl
;
688 for (auto p
= m
->entries
.begin();
689 p
!= m
->entries
.end();
691 if (!summary
.contains(p
->key()))
695 dout(10) << " nothing new" << dendl
;
706 struct LogMonitor::C_Log
: public C_MonOp
{
708 C_Log(LogMonitor
*p
, MonOpRequestRef o
) :
709 C_MonOp(o
), logmon(p
) {}
710 void _finish(int r
) override
{
711 if (r
== -ECANCELED
) {
714 logmon
->_updated_log(op
);
718 bool LogMonitor::prepare_log(MonOpRequestRef op
)
720 op
->mark_logmon_event("prepare_log");
721 auto m
= op
->get_req
<MLog
>();
722 dout(10) << "prepare_log " << *m
<< " from " << m
->get_orig_source() << dendl
;
724 if (m
->fsid
!= mon
.monmap
->fsid
) {
725 dout(0) << "handle_log on fsid " << m
->fsid
<< " != " << mon
.monmap
->fsid
730 for (auto p
= m
->entries
.begin();
731 p
!= m
->entries
.end();
733 dout(10) << " logging " << *p
<< dendl
;
734 if (!summary
.contains(p
->key()) &&
735 !pending_keys
.count(p
->key())) {
736 pending_keys
.insert(p
->key());
737 pending_log
.insert(pair
<utime_t
,LogEntry
>(p
->stamp
, *p
));
740 wait_for_finished_proposal(op
, new C_Log(this, op
));
744 void LogMonitor::_updated_log(MonOpRequestRef op
)
746 auto m
= op
->get_req
<MLog
>();
747 dout(7) << "_updated_log for " << m
->get_orig_source_inst() << dendl
;
748 mon
.send_reply(op
, new MLogAck(m
->fsid
, m
->entries
.rbegin()->seq
));
751 bool LogMonitor::should_propose(double& delay
)
753 // commit now if we have a lot of pending events
754 if (g_conf()->mon_max_log_entries_per_event
> 0 &&
755 pending_log
.size() >= (unsigned)g_conf()->mon_max_log_entries_per_event
)
758 // otherwise fall back to generic policy
759 return PaxosService::should_propose(delay
);
763 bool LogMonitor::preprocess_command(MonOpRequestRef op
)
765 op
->mark_logmon_event("preprocess_command");
766 auto m
= op
->get_req
<MMonCommand
>();
772 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
773 string rs
= ss
.str();
774 mon
.reply_command(op
, -EINVAL
, rs
, get_last_committed());
777 MonSession
*session
= op
->get_session();
779 mon
.reply_command(op
, -EACCES
, "access denied", get_last_committed());
784 cmd_getval(cmdmap
, "prefix", prefix
);
786 string format
= cmd_getval_or
<string
>(cmdmap
, "format", "plain");
787 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
789 if (prefix
== "log last") {
791 cmd_getval(cmdmap
, "num", num
);
793 f
->open_array_section("tail");
796 std::string level_str
;
798 if (cmd_getval(cmdmap
, "level", level_str
)) {
799 level
= LogEntry::str_to_level(level_str
);
800 if (level
== CLOG_UNKNOWN
) {
801 ss
<< "Invalid severity '" << level_str
<< "'";
802 mon
.reply_command(op
, -EINVAL
, ss
.str(), get_last_committed());
810 if (!cmd_getval(cmdmap
, "channel", channel
)) {
811 channel
= CLOG_CHANNEL_DEFAULT
;
814 // We'll apply this twice, once while counting out lines
815 // and once while outputting them.
816 auto match
= [level
](const LogEntry
&entry
) {
817 return entry
.prio
>= level
;
821 if (!summary
.tail_by_channel
.empty()) {
823 // Decrement operation that sets to container end when hitting rbegin
824 if (channel
== "*") {
825 list
<LogEntry
> full_tail
;
826 summary
.build_ordered_tail_legacy(&full_tail
);
827 auto rp
= full_tail
.rbegin();
828 for (; num
> 0 && rp
!= full_tail
.rend(); ++rp
) {
833 if (rp
== full_tail
.rend()) {
837 // Decrement a reverse iterator such that going past rbegin()
838 // sets it to rend(). This is for writing a for() loop that
839 // goes up to (and including) rbegin()
840 auto dec
= [&rp
, &full_tail
] () {
841 if (rp
== full_tail
.rbegin()) {
842 rp
= full_tail
.rend();
848 // Move forward to the end of the container (decrement the reverse
850 for (; rp
!= full_tail
.rend(); dec()) {
855 f
->dump_object("entry", *rp
);
861 auto p
= summary
.tail_by_channel
.find(channel
);
862 if (p
!= summary
.tail_by_channel
.end()) {
863 auto rp
= p
->second
.rbegin();
864 for (; num
> 0 && rp
!= p
->second
.rend(); ++rp
) {
865 if (match(rp
->second
)) {
869 if (rp
== p
->second
.rend()) {
873 // Decrement a reverse iterator such that going past rbegin()
874 // sets it to rend(). This is for writing a for() loop that
875 // goes up to (and including) rbegin()
876 auto dec
= [&rp
, &p
] () {
877 if (rp
== p
->second
.rbegin()) {
878 rp
= p
->second
.rend();
884 // Move forward to the end of the container (decrement the reverse
886 for (; rp
!= p
->second
.rend(); dec()) {
887 if (!match(rp
->second
)) {
891 f
->dump_object("entry", rp
->second
);
893 ss
<< rp
->second
<< "\n";
900 if (channel
== "*") {
901 // tail all channels; we need to mix by timestamp
902 multimap
<utime_t
,LogEntry
> entries
; // merge+sort all channels by timestamp
903 for (auto& p
: summary
.channel_info
) {
904 version_t from
= p
.second
.first
;
905 version_t to
= p
.second
.second
;
907 if (to
> (version_t
)num
) {
908 start
= std::max(to
- num
, from
);
912 dout(10) << __func__
<< " channnel " << p
.first
913 << " from " << from
<< " to " << to
<< dendl
;
914 for (version_t v
= start
; v
< to
; ++v
) {
917 generate_logentry_key(p
.first
, v
, &key
);
918 int r
= mon
.store
->get(get_service_name(), key
, ebl
);
920 derr
<< __func__
<< " missing key " << key
<< dendl
;
924 auto p
= ebl
.cbegin();
926 entries
.insert(make_pair(le
.stamp
, le
));
929 while ((int)entries
.size() > num
) {
930 entries
.erase(entries
.begin());
932 for (auto& p
: entries
) {
934 f
->dump_object("entry", p
.second
);
936 ss
<< p
.second
<< "\n";
941 auto p
= summary
.channel_info
.find(channel
);
942 if (p
!= summary
.channel_info
.end()) {
943 version_t from
= p
->second
.first
;
944 version_t to
= p
->second
.second
;
946 if (to
> (version_t
)num
) {
947 start
= std::max(to
- num
, from
);
951 dout(10) << __func__
<< " from " << from
<< " to " << to
<< dendl
;
952 for (version_t v
= start
; v
< to
; ++v
) {
955 generate_logentry_key(channel
, v
, &key
);
956 int r
= mon
.store
->get(get_service_name(), key
, ebl
);
958 derr
<< __func__
<< " missing key " << key
<< dendl
;
962 auto p
= ebl
.cbegin();
965 f
->dump_object("entry", le
);
977 rdata
.append(ss
.str());
986 mon
.reply_command(op
, r
, rs
, rdata
, get_last_committed());
991 bool LogMonitor::prepare_command(MonOpRequestRef op
)
993 op
->mark_logmon_event("prepare_command");
994 auto m
= op
->get_req
<MMonCommand
>();
1000 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
1001 // ss has reason for failure
1002 string rs
= ss
.str();
1003 mon
.reply_command(op
, -EINVAL
, rs
, get_last_committed());
1008 cmd_getval(cmdmap
, "prefix", prefix
);
1010 MonSession
*session
= op
->get_session();
1012 mon
.reply_command(op
, -EACCES
, "access denied", get_last_committed());
1016 if (prefix
== "log") {
1017 vector
<string
> logtext
;
1018 cmd_getval(cmdmap
, "logtext", logtext
);
1020 le
.rank
= m
->get_orig_source();
1021 le
.addrs
.v
.push_back(m
->get_orig_source_addr());
1022 le
.name
= session
->entity_name
;
1023 le
.stamp
= m
->get_recv_stamp();
1025 string level_str
= cmd_getval_or
<string
>(cmdmap
, "level", "info");
1026 le
.prio
= LogEntry::str_to_level(level_str
);
1027 le
.channel
= CLOG_CHANNEL_DEFAULT
;
1028 le
.msg
= str_join(logtext
, " ");
1029 pending_keys
.insert(le
.key());
1030 pending_log
.insert(pair
<utime_t
,LogEntry
>(le
.stamp
, le
));
1031 wait_for_finished_proposal(op
, new Monitor::C_Command(
1032 mon
, op
, 0, string(), get_last_committed() + 1));
1037 mon
.reply_command(op
, err
, rs
, get_last_committed());
1042 int LogMonitor::sub_name_to_id(const string
& n
)
1044 if (n
.substr(0, 4) == "log-" && n
.size() > 4) {
1045 return LogEntry::str_to_level(n
.substr(4));
1047 return CLOG_UNKNOWN
;
1051 void LogMonitor::check_subs()
1053 dout(10) << __func__
<< dendl
;
1054 for (map
<string
, xlist
<Subscription
*>*>::iterator i
= mon
.session_map
.subs
.begin();
1055 i
!= mon
.session_map
.subs
.end();
1057 for (xlist
<Subscription
*>::iterator j
= i
->second
->begin(); !j
.end(); ++j
) {
1058 if (sub_name_to_id((*j
)->type
) >= 0)
1064 void LogMonitor::check_sub(Subscription
*s
)
1066 dout(10) << __func__
<< " client wants " << s
->type
<< " ver " << s
->next
<< dendl
;
1068 int sub_level
= sub_name_to_id(s
->type
);
1069 ceph_assert(sub_level
>= 0);
1071 version_t summary_version
= summary
.version
;
1072 if (s
->next
> summary_version
) {
1073 dout(10) << __func__
<< " client " << s
->session
->name
1074 << " requested version (" << s
->next
<< ") is greater than ours ("
1075 << summary_version
<< "), which means we already sent him"
1076 << " everything we have." << dendl
;
1080 MLog
*mlog
= new MLog(mon
.monmap
->fsid
);
1083 /* First timer, heh? */
1084 _create_sub_incremental(mlog
, sub_level
, get_last_committed());
1086 /* let us send you an incremental log... */
1087 _create_sub_incremental(mlog
, sub_level
, s
->next
);
1090 dout(10) << __func__
<< " sending message to " << s
->session
->name
1091 << " with " << mlog
->entries
.size() << " entries"
1092 << " (version " << mlog
->version
<< ")" << dendl
;
1094 if (!mlog
->entries
.empty()) {
1095 s
->session
->con
->send_message(mlog
);
1100 mon
.session_map
.remove_sub(s
);
1102 s
->next
= summary_version
+1;
1106 * Create an incremental log message from version \p sv to \p summary.version
1108 * @param mlog Log message we'll send to the client with the messages received
1109 * since version \p sv, inclusive.
1110 * @param level The max log level of the messages the client is interested in.
1111 * @param sv The version the client is looking for.
1113 void LogMonitor::_create_sub_incremental(MLog
*mlog
, int level
, version_t sv
)
1115 dout(10) << __func__
<< " level " << level
<< " ver " << sv
1116 << " cur summary ver " << summary
.version
<< dendl
;
1118 if (sv
< get_first_committed()) {
1119 dout(10) << __func__
<< " skipped from " << sv
1120 << " to first_committed " << get_first_committed() << dendl
;
1122 le
.stamp
= ceph_clock_now();
1123 le
.prio
= CLOG_WARN
;
1125 ss
<< "skipped log messages from " << sv
<< " to " << get_first_committed();
1127 mlog
->entries
.push_back(le
);
1128 sv
= get_first_committed();
1131 version_t summary_ver
= summary
.version
;
1132 while (sv
&& sv
<= summary_ver
) {
1134 int err
= get_version(sv
, bl
);
1135 ceph_assert(err
== 0);
1136 ceph_assert(bl
.length());
1137 auto p
= bl
.cbegin();
1143 dout(20) << __func__
<< " sv " << sv
<< " has " << num
<< " entries" << dendl
;
1145 while ((num
== -2 && !p
.end()) || (num
>= 0 && num
--)) {
1148 if (le
.prio
< level
) {
1149 dout(20) << __func__
<< " requested " << level
1150 << ", skipping " << le
<< dendl
;
1153 mlog
->entries
.push_back(le
);
1155 mlog
->version
= sv
++;
1158 dout(10) << __func__
<< " incremental message ready ("
1159 << mlog
->entries
.size() << " entries)" << dendl
;
1162 void LogMonitor::update_log_channels()
1168 int r
= get_conf_str_map_helper(
1169 g_conf().get_val
<string
>("mon_cluster_log_to_syslog"),
1170 oss
, &channels
.log_to_syslog
,
1171 CLOG_CONFIG_DEFAULT_KEY
);
1173 derr
<< __func__
<< " error parsing 'mon_cluster_log_to_syslog'" << dendl
;
1177 r
= get_conf_str_map_helper(
1178 g_conf().get_val
<string
>("mon_cluster_log_to_syslog_level"),
1179 oss
, &channels
.syslog_level
,
1180 CLOG_CONFIG_DEFAULT_KEY
);
1182 derr
<< __func__
<< " error parsing 'mon_cluster_log_to_syslog_level'"
1187 r
= get_conf_str_map_helper(
1188 g_conf().get_val
<string
>("mon_cluster_log_to_syslog_facility"),
1189 oss
, &channels
.syslog_facility
,
1190 CLOG_CONFIG_DEFAULT_KEY
);
1192 derr
<< __func__
<< " error parsing 'mon_cluster_log_to_syslog_facility'"
1197 r
= get_conf_str_map_helper(
1198 g_conf().get_val
<string
>("mon_cluster_log_file"), oss
,
1200 CLOG_CONFIG_DEFAULT_KEY
);
1202 derr
<< __func__
<< " error parsing 'mon_cluster_log_file'" << dendl
;
1206 r
= get_conf_str_map_helper(
1207 g_conf().get_val
<string
>("mon_cluster_log_file_level"), oss
,
1208 &channels
.log_file_level
,
1209 CLOG_CONFIG_DEFAULT_KEY
);
1211 derr
<< __func__
<< " error parsing 'mon_cluster_log_file_level'"
1216 r
= get_conf_str_map_helper(
1217 g_conf().get_val
<string
>("mon_cluster_log_to_graylog"), oss
,
1218 &channels
.log_to_graylog
,
1219 CLOG_CONFIG_DEFAULT_KEY
);
1221 derr
<< __func__
<< " error parsing 'mon_cluster_log_to_graylog'"
1226 r
= get_conf_str_map_helper(
1227 g_conf().get_val
<string
>("mon_cluster_log_to_graylog_host"), oss
,
1228 &channels
.log_to_graylog_host
,
1229 CLOG_CONFIG_DEFAULT_KEY
);
1231 derr
<< __func__
<< " error parsing 'mon_cluster_log_to_graylog_host'"
1236 r
= get_conf_str_map_helper(
1237 g_conf().get_val
<string
>("mon_cluster_log_to_graylog_port"), oss
,
1238 &channels
.log_to_graylog_port
,
1239 CLOG_CONFIG_DEFAULT_KEY
);
1241 derr
<< __func__
<< " error parsing 'mon_cluster_log_to_graylog_port'"
1246 r
= get_conf_str_map_helper(
1247 g_conf().get_val
<string
>("mon_cluster_log_to_journald"), oss
,
1248 &channels
.log_to_journald
,
1249 CLOG_CONFIG_DEFAULT_KEY
);
1251 derr
<< __func__
<< " error parsing 'mon_cluster_log_to_journald'"
1256 channels
.expand_channel_meta();
1257 log_external_close_fds();
1261 void LogMonitor::handle_conf_change(const ConfigProxy
& conf
,
1262 const std::set
<std::string
> &changed
)
1264 if (changed
.count("mon_cluster_log_to_syslog") ||
1265 changed
.count("mon_cluster_log_to_syslog_level") ||
1266 changed
.count("mon_cluster_log_to_syslog_facility") ||
1267 changed
.count("mon_cluster_log_file") ||
1268 changed
.count("mon_cluster_log_file_level") ||
1269 changed
.count("mon_cluster_log_to_graylog") ||
1270 changed
.count("mon_cluster_log_to_graylog_host") ||
1271 changed
.count("mon_cluster_log_to_graylog_port") ||
1272 changed
.count("mon_cluster_log_to_journald") ||
1273 changed
.count("mon_cluster_log_to_file")) {
1274 update_log_channels();