]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/LogMonitor.cc
9103ddf7c5b5d307e936457a462419e31dfe425f
[ceph.git] / ceph / src / mon / LogMonitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 /*
17
18 -- Storage scheme --
19
20 Pre-quincy:
21
22 - LogSummary contains last N entries for every channel
23 - LogSummary (as "full") written on every commit
24 - LogSummary contains "keys" which LogEntryKey hash_set for the
25 same set of entries (for deduping)
26
27 Quincy+:
28
29 - LogSummary contains, for each channel,
30 - start seq
31 - end seq (last written seq + 1)
32 - LogSummary contains an LRUSet for tracking dups
33 - LogSummary written every N commits
34 - each LogEntry written in a separate key
35 - "%s/%08x" % (channel, seq) -> LogEntry
36 - per-commit record includes channel -> begin (trim bounds)
37 - 'external_log_to' meta records version to which we have logged externally
38
39 */
40
41
42
43 #include <boost/algorithm/string/predicate.hpp>
44
45 #include <sstream>
46 #include <syslog.h>
47
48 #include "LogMonitor.h"
49 #include "Monitor.h"
50 #include "MonitorDBStore.h"
51
52 #include "messages/MMonCommand.h"
53 #include "messages/MLog.h"
54 #include "messages/MLogAck.h"
55 #include "common/Graylog.h"
56 #include "common/Journald.h"
57 #include "common/errno.h"
58 #include "common/strtol.h"
59 #include "include/ceph_assert.h"
60 #include "include/str_list.h"
61 #include "include/str_map.h"
62 #include "include/compat.h"
63
64 #define dout_subsys ceph_subsys_mon
65
66 using namespace TOPNSPC::common;
67
68 using std::cerr;
69 using std::cout;
70 using std::dec;
71 using std::hex;
72 using std::list;
73 using std::map;
74 using std::make_pair;
75 using std::multimap;
76 using std::ostream;
77 using std::ostringstream;
78 using std::pair;
79 using std::set;
80 using std::setfill;
81 using std::string;
82 using std::stringstream;
83 using std::to_string;
84 using std::vector;
85 using std::unique_ptr;
86
87 using ceph::bufferlist;
88 using ceph::decode;
89 using ceph::encode;
90 using ceph::Formatter;
91 using ceph::JSONFormatter;
92 using ceph::make_message;
93 using ceph::mono_clock;
94 using ceph::mono_time;
95 using ceph::timespan_str;
96
97 string LogMonitor::log_channel_info::get_log_file(const string &channel)
98 {
99 dout(25) << __func__ << " for channel '"
100 << channel << "'" << dendl;
101
102 if (expanded_log_file.count(channel) == 0) {
103 string fname = expand_channel_meta(
104 get_str_map_key(log_file, channel, &CLOG_CONFIG_DEFAULT_KEY),
105 channel);
106 expanded_log_file[channel] = fname;
107
108 dout(20) << __func__ << " for channel '"
109 << channel << "' expanded to '"
110 << fname << "'" << dendl;
111 }
112 return expanded_log_file[channel];
113 }
114
115
116 void LogMonitor::log_channel_info::expand_channel_meta(map<string,string> &m)
117 {
118 dout(20) << __func__ << " expand map: " << m << dendl;
119 for (map<string,string>::iterator p = m.begin(); p != m.end(); ++p) {
120 m[p->first] = expand_channel_meta(p->second, p->first);
121 }
122 dout(20) << __func__ << " expanded map: " << m << dendl;
123 }
124
125 string LogMonitor::log_channel_info::expand_channel_meta(
126 const string &input,
127 const string &change_to)
128 {
129 size_t pos = string::npos;
130 string s(input);
131 while ((pos = s.find(LOG_META_CHANNEL)) != string::npos) {
132 string tmp = s.substr(0, pos) + change_to;
133 if (pos+LOG_META_CHANNEL.length() < s.length())
134 tmp += s.substr(pos+LOG_META_CHANNEL.length());
135 s = tmp;
136 }
137 dout(20) << __func__ << " from '" << input
138 << "' to '" << s << "'" << dendl;
139
140 return s;
141 }
142
143 bool LogMonitor::log_channel_info::do_log_to_syslog(const string &channel) {
144 string v = get_str_map_key(log_to_syslog, channel,
145 &CLOG_CONFIG_DEFAULT_KEY);
146 // We expect booleans, but they are in k/v pairs, kept
147 // as strings, in 'log_to_syslog'. We must ensure
148 // compatibility with existing boolean handling, and so
149 // we are here using a modified version of how
150 // md_config_t::set_val_raw() handles booleans. We will
151 // accept both 'true' and 'false', but will also check for
152 // '1' and '0'. The main distiction between this and the
153 // original code is that we will assume everything not '1',
154 // '0', 'true' or 'false' to be 'false'.
155 bool ret = false;
156
157 if (boost::iequals(v, "false")) {
158 ret = false;
159 } else if (boost::iequals(v, "true")) {
160 ret = true;
161 } else {
162 std::string err;
163 int b = strict_strtol(v.c_str(), 10, &err);
164 ret = (err.empty() && b == 1);
165 }
166
167 return ret;
168 }
169
170 ceph::logging::Graylog::Ref LogMonitor::log_channel_info::get_graylog(
171 const string &channel)
172 {
173 dout(25) << __func__ << " for channel '"
174 << channel << "'" << dendl;
175
176 if (graylogs.count(channel) == 0) {
177 auto graylog(std::make_shared<ceph::logging::Graylog>("mon"));
178
179 graylog->set_fsid(g_conf().get_val<uuid_d>("fsid"));
180 graylog->set_hostname(g_conf()->host);
181 graylog->set_destination(get_str_map_key(log_to_graylog_host, channel,
182 &CLOG_CONFIG_DEFAULT_KEY),
183 atoi(get_str_map_key(log_to_graylog_port, channel,
184 &CLOG_CONFIG_DEFAULT_KEY).c_str()));
185
186 graylogs[channel] = graylog;
187 dout(20) << __func__ << " for channel '"
188 << channel << "' to graylog host '"
189 << log_to_graylog_host[channel] << ":"
190 << log_to_graylog_port[channel]
191 << "'" << dendl;
192 }
193 return graylogs[channel];
194 }
195
196 ceph::logging::JournaldClusterLogger &LogMonitor::log_channel_info::get_journald()
197 {
198 dout(25) << __func__ << dendl;
199
200 if (!journald) {
201 journald = std::make_unique<ceph::logging::JournaldClusterLogger>();
202 }
203 return *journald;
204 }
205
206 void LogMonitor::log_channel_info::clear()
207 {
208 log_to_syslog.clear();
209 syslog_level.clear();
210 syslog_facility.clear();
211 log_file.clear();
212 expanded_log_file.clear();
213 log_file_level.clear();
214 log_to_graylog.clear();
215 log_to_graylog_host.clear();
216 log_to_graylog_port.clear();
217 log_to_journald.clear();
218 graylogs.clear();
219 journald.reset();
220 }
221
222 LogMonitor::log_channel_info::log_channel_info() = default;
223 LogMonitor::log_channel_info::~log_channel_info() = default;
224
225
226 #undef dout_prefix
227 #define dout_prefix _prefix(_dout, mon, get_last_committed())
228 static ostream& _prefix(std::ostream *_dout, Monitor &mon, version_t v) {
229 return *_dout << "mon." << mon.name << "@" << mon.rank
230 << "(" << mon.get_state_name()
231 << ").log v" << v << " ";
232 }
233
234 ostream& operator<<(ostream &out, const LogMonitor &pm)
235 {
236 return out << "log";
237 }
238
239 /*
240 Tick function to update the map based on performance every N seconds
241 */
242
243 void LogMonitor::tick()
244 {
245 if (!is_active()) return;
246
247 dout(10) << *this << dendl;
248
249 }
250
251 void LogMonitor::create_initial()
252 {
253 dout(10) << "create_initial -- creating initial map" << dendl;
254 LogEntry e;
255 e.name = g_conf()->name;
256 e.rank = entity_name_t::MON(mon.rank);
257 e.addrs = mon.messenger->get_myaddrs();
258 e.stamp = ceph_clock_now();
259 e.prio = CLOG_INFO;
260 e.channel = CLOG_CHANNEL_CLUSTER;
261 std::stringstream ss;
262 ss << "mkfs " << mon.monmap->get_fsid();
263 e.msg = ss.str();
264 e.seq = 0;
265 pending_log.insert(pair<utime_t,LogEntry>(e.stamp, e));
266 }
267
268 void LogMonitor::update_from_paxos(bool *need_bootstrap)
269 {
270 dout(10) << __func__ << dendl;
271 version_t version = get_last_committed();
272 dout(10) << __func__ << " version " << version
273 << " summary v " << summary.version << dendl;
274
275 log_external_backlog();
276
277 if (version == summary.version)
278 return;
279 ceph_assert(version >= summary.version);
280
281 version_t latest_full = get_version_latest_full();
282 dout(10) << __func__ << " latest full " << latest_full << dendl;
283 if ((latest_full > 0) && (latest_full > summary.version)) {
284 bufferlist latest_bl;
285 get_version_full(latest_full, latest_bl);
286 ceph_assert(latest_bl.length() != 0);
287 dout(7) << __func__ << " loading summary e" << latest_full << dendl;
288 auto p = latest_bl.cbegin();
289 decode(summary, p);
290 dout(7) << __func__ << " loaded summary e" << summary.version << dendl;
291 }
292
293 // walk through incrementals
294 while (version > summary.version) {
295 bufferlist bl;
296 int err = get_version(summary.version+1, bl);
297 ceph_assert(err == 0);
298 ceph_assert(bl.length());
299
300 auto p = bl.cbegin();
301 __u8 struct_v;
302 decode(struct_v, p);
303 if (struct_v == 1) {
304 // legacy pre-quincy commits
305 while (!p.end()) {
306 LogEntry le;
307 le.decode(p);
308 dout(7) << "update_from_paxos applying incremental log "
309 << summary.version+1 << " " << le << dendl;
310 summary.add_legacy(le);
311 }
312 } else {
313 uint32_t num;
314 decode(num, p);
315 while (num--) {
316 LogEntry le;
317 le.decode(p);
318 dout(7) << "update_from_paxos applying incremental log "
319 << summary.version+1 << " " << le << dendl;
320 summary.recent_keys.insert(le.key());
321 summary.channel_info[le.channel].second++;
322 // we may have logged past the (persisted) summary in a prior quorum
323 if (version > external_log_to) {
324 log_external(le);
325 }
326 }
327 map<string,version_t> prune_channels_to;
328 decode(prune_channels_to, p);
329 for (auto& [channel, prune_to] : prune_channels_to) {
330 dout(20) << __func__ << " channel " << channel
331 << " pruned to " << prune_to << dendl;
332 summary.channel_info[channel].first = prune_to;
333 }
334 // zero out pre-quincy fields (encode_pending needs this to reliably detect
335 // upgrade)
336 summary.tail_by_channel.clear();
337 summary.keys.clear();
338 }
339
340 summary.version++;
341 summary.prune(g_conf()->mon_log_max_summary);
342 }
343 dout(10) << " summary.channel_info " << summary.channel_info << dendl;
344 external_log_to = version;
345 mon.store->write_meta("external_log_to", stringify(external_log_to));
346
347 check_subs();
348 }
349
350 void LogMonitor::log_external(const LogEntry& le)
351 {
352 string channel = le.channel;
353 if (channel.empty()) { // keep retrocompatibility
354 channel = CLOG_CHANNEL_CLUSTER;
355 }
356
357 if (g_conf().get_val<bool>("mon_cluster_log_to_stderr")) {
358 cerr << channel << " " << le << std::endl;
359 }
360
361 if (channels.do_log_to_syslog(channel)) {
362 string level = channels.get_level(channel);
363 string facility = channels.get_facility(channel);
364 if (level.empty() || facility.empty()) {
365 derr << __func__ << " unable to log to syslog -- level or facility"
366 << " not defined (level: " << level << ", facility: "
367 << facility << ")" << dendl;
368 } else {
369 le.log_to_syslog(channels.get_level(channel),
370 channels.get_facility(channel));
371 }
372 }
373
374 if (channels.do_log_to_graylog(channel)) {
375 ceph::logging::Graylog::Ref graylog = channels.get_graylog(channel);
376 if (graylog) {
377 graylog->log_log_entry(&le);
378 }
379 dout(7) << "graylog: " << channel << " " << graylog
380 << " host:" << channels.log_to_graylog_host << dendl;
381 }
382
383 if (channels.do_log_to_journald(channel)) {
384 auto &journald = channels.get_journald();
385 journald.log_log_entry(le);
386 dout(7) << "journald: " << channel << dendl;
387 }
388
389 if (g_conf()->mon_cluster_log_to_file) {
390 auto p = channel_fds.find(channel);
391 int fd;
392 if (p == channel_fds.end()) {
393 string log_file = channels.get_log_file(channel);
394 dout(20) << __func__ << " logging for channel '" << channel
395 << "' to file '" << log_file << "'" << dendl;
396 if (log_file.empty()) {
397 // do not log this channel
398 fd = -1;
399 } else {
400 fd = ::open(log_file.c_str(), O_WRONLY|O_APPEND|O_CREAT|O_CLOEXEC, 0600);
401 if (fd < 0) {
402 int err = -errno;
403 dout(1) << "unable to write to '" << log_file << "' for channel '"
404 << channel << "': " << cpp_strerror(err) << dendl;
405 } else {
406 channel_fds[channel] = fd;
407 }
408 }
409 } else {
410 fd = p->second;
411 }
412
413 if (fd >= 0) {
414 fmt::format_to(file_log_buffer, "{}\n", le);
415 int err = safe_write(fd, file_log_buffer.data(), file_log_buffer.size());
416 file_log_buffer.clear();
417 if (err < 0) {
418 dout(1) << "error writing to '" << channels.get_log_file(channel)
419 << "' for channel '" << channel
420 << ": " << cpp_strerror(err) << dendl;
421 ::close(fd);
422 channel_fds.erase(channel);
423 }
424 }
425 }
426 }
427
428 void LogMonitor::log_external_close_fds()
429 {
430 for (auto& [channel, fd] : channel_fds) {
431 if (fd >= 0) {
432 dout(10) << __func__ << " closing " << channel << " (" << fd << ")" << dendl;
433 ::close(fd);
434 }
435 }
436 channel_fds.clear();
437 }
438
439 /// catch external logs up to summary.version
440 void LogMonitor::log_external_backlog()
441 {
442 if (!external_log_to) {
443 std::string cur_str;
444 int r = mon.store->read_meta("external_log_to", &cur_str);
445 if (r == 0) {
446 external_log_to = std::stoull(cur_str);
447 dout(10) << __func__ << " initialized external_log_to = " << external_log_to
448 << " (recorded log_to position)" << dendl;
449 } else {
450 // pre-quincy, we assumed that anything through summary.version was
451 // logged externally.
452 assert(r == -ENOENT);
453 external_log_to = summary.version;
454 dout(10) << __func__ << " initialized external_log_to = " << external_log_to
455 << " (summary v " << summary.version << ")" << dendl;
456 }
457 }
458 // we may have logged ahead of summary.version, but never ahead of paxos
459 if (external_log_to > get_last_committed()) {
460 derr << __func__ << " rewinding external_log_to from " << external_log_to
461 << " -> " << get_last_committed() << " (sync_force? mon rebuild?)" << dendl;
462 external_log_to = get_last_committed();
463 }
464 if (external_log_to >= summary.version) {
465 return;
466 }
467 if (auto first = get_first_committed(); external_log_to < first) {
468 derr << __func__ << " local logs at " << external_log_to
469 << ", skipping to " << first << dendl;
470 external_log_to = first;
471 // FIXME: write marker in each channel log file?
472 }
473 for (; external_log_to < summary.version; ++external_log_to) {
474 bufferlist bl;
475 int err = get_version(external_log_to+1, bl);
476 ceph_assert(err == 0);
477 ceph_assert(bl.length());
478 auto p = bl.cbegin();
479 __u8 v;
480 decode(v, p);
481 int32_t num = -2;
482 if (v >= 2) {
483 decode(num, p);
484 }
485 while ((num == -2 && !p.end()) || (num >= 0 && num--)) {
486 LogEntry le;
487 le.decode(p);
488 log_external(le);
489 }
490 }
491 mon.store->write_meta("external_log_to", stringify(external_log_to));
492 }
493
494 void LogMonitor::create_pending()
495 {
496 pending_log.clear();
497 pending_keys.clear();
498 dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl;
499 }
500
501 void LogMonitor::generate_logentry_key(
502 const std::string& channel,
503 version_t v,
504 std::string *out)
505 {
506 out->append(channel);
507 out->append("/");
508 char vs[10];
509 snprintf(vs, sizeof(vs), "%08llx", (unsigned long long)v);
510 out->append(vs);
511 }
512
513 void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t)
514 {
515 version_t version = get_last_committed() + 1;
516 bufferlist bl;
517 dout(10) << __func__ << " v" << version << dendl;
518
519 if (mon.monmap->min_mon_release < ceph_release_t::quincy) {
520 // legacy encoding for pre-quincy quorum
521 __u8 struct_v = 1;
522 encode(struct_v, bl);
523 for (auto& p : pending_log) {
524 p.second.encode(bl, mon.get_quorum_con_features());
525 }
526 put_version(t, version, bl);
527 put_last_committed(t, version);
528 return;
529 }
530
531 __u8 struct_v = 2;
532 encode(struct_v, bl);
533
534 // first commit after upgrading to quincy?
535 if (!summary.tail_by_channel.empty()) {
536 // include past log entries
537 for (auto& p : summary.tail_by_channel) {
538 for (auto& q : p.second) {
539 pending_log.emplace(make_pair(q.second.stamp, q.second));
540 }
541 }
542 }
543
544 // record new entries
545 auto pending_channel_info = summary.channel_info;
546 uint32_t num = pending_log.size();
547 encode(num, bl);
548 dout(20) << __func__ << " writing " << num << " entries" << dendl;
549 for (auto& p : pending_log) {
550 bufferlist ebl;
551 p.second.encode(ebl, mon.get_quorum_con_features());
552
553 auto& bounds = pending_channel_info[p.second.channel];
554 version_t v = bounds.second++;
555 std::string key;
556 generate_logentry_key(p.second.channel, v, &key);
557 t->put(get_service_name(), key, ebl);
558
559 bl.claim_append(ebl);
560 }
561
562 // prune log entries?
563 map<string,version_t> prune_channels_to;
564 for (auto& [channel, info] : summary.channel_info) {
565 if (info.second - info.first > g_conf()->mon_log_max) {
566 const version_t from = info.first;
567 const version_t to = info.second - g_conf()->mon_log_max;
568 dout(10) << __func__ << " pruning channel " << channel
569 << " " << from << " -> " << to << dendl;
570 prune_channels_to[channel] = to;
571 pending_channel_info[channel].first = to;
572 for (version_t v = from; v < to; ++v) {
573 std::string key;
574 generate_logentry_key(channel, v, &key);
575 t->erase(get_service_name(), key);
576 }
577 }
578 }
579 dout(20) << __func__ << " prune_channels_to " << prune_channels_to << dendl;
580 encode(prune_channels_to, bl);
581
582 put_version(t, version, bl);
583 put_last_committed(t, version);
584 }
585
586 bool LogMonitor::should_stash_full()
587 {
588 if (mon.monmap->min_mon_release < ceph_release_t::quincy) {
589 // commit a LogSummary on every commit
590 return true;
591 }
592
593 // store periodic summary
594 auto period = std::min<uint64_t>(
595 g_conf()->mon_log_full_interval,
596 g_conf()->mon_max_log_epochs
597 );
598 return (get_last_committed() - get_version_latest_full() > period);
599 }
600
601
602 void LogMonitor::encode_full(MonitorDBStore::TransactionRef t)
603 {
604 dout(10) << __func__ << " log v " << summary.version << dendl;
605 ceph_assert(get_last_committed() == summary.version);
606
607 bufferlist summary_bl;
608 encode(summary, summary_bl, mon.get_quorum_con_features());
609
610 put_version_full(t, summary.version, summary_bl);
611 put_version_latest_full(t, summary.version);
612 }
613
614 version_t LogMonitor::get_trim_to() const
615 {
616 if (!mon.is_leader())
617 return 0;
618
619 unsigned max = g_conf()->mon_max_log_epochs;
620 version_t version = get_last_committed();
621 if (version > max)
622 return version - max;
623 return 0;
624 }
625
626 bool LogMonitor::preprocess_query(MonOpRequestRef op)
627 {
628 op->mark_logmon_event("preprocess_query");
629 auto m = op->get_req<PaxosServiceMessage>();
630 dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
631 switch (m->get_type()) {
632 case MSG_MON_COMMAND:
633 try {
634 return preprocess_command(op);
635 } catch (const bad_cmd_get& e) {
636 bufferlist bl;
637 mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
638 return true;
639 }
640
641 case MSG_LOG:
642 return preprocess_log(op);
643
644 default:
645 ceph_abort();
646 return true;
647 }
648 }
649
650 bool LogMonitor::prepare_update(MonOpRequestRef op)
651 {
652 op->mark_logmon_event("prepare_update");
653 auto m = op->get_req<PaxosServiceMessage>();
654 dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
655 switch (m->get_type()) {
656 case MSG_MON_COMMAND:
657 try {
658 return prepare_command(op);
659 } catch (const bad_cmd_get& e) {
660 bufferlist bl;
661 mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
662 return true;
663 }
664 case MSG_LOG:
665 return prepare_log(op);
666 default:
667 ceph_abort();
668 return false;
669 }
670 }
671
672 bool LogMonitor::preprocess_log(MonOpRequestRef op)
673 {
674 op->mark_logmon_event("preprocess_log");
675 auto m = op->get_req<MLog>();
676 dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl;
677 int num_new = 0;
678
679 MonSession *session = op->get_session();
680 if (!session)
681 goto done;
682 if (!session->is_capable("log", MON_CAP_W)) {
683 dout(0) << "preprocess_log got MLog from entity with insufficient privileges "
684 << session->caps << dendl;
685 goto done;
686 }
687
688 for (auto p = m->entries.begin();
689 p != m->entries.end();
690 ++p) {
691 if (!summary.contains(p->key()))
692 num_new++;
693 }
694 if (!num_new) {
695 dout(10) << " nothing new" << dendl;
696 goto done;
697 }
698
699 return false;
700
701 done:
702 mon.no_reply(op);
703 return true;
704 }
705
706 struct LogMonitor::C_Log : public C_MonOp {
707 LogMonitor *logmon;
708 C_Log(LogMonitor *p, MonOpRequestRef o) :
709 C_MonOp(o), logmon(p) {}
710 void _finish(int r) override {
711 if (r == -ECANCELED) {
712 return;
713 }
714 logmon->_updated_log(op);
715 }
716 };
717
718 bool LogMonitor::prepare_log(MonOpRequestRef op)
719 {
720 op->mark_logmon_event("prepare_log");
721 auto m = op->get_req<MLog>();
722 dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl;
723
724 if (m->fsid != mon.monmap->fsid) {
725 dout(0) << "handle_log on fsid " << m->fsid << " != " << mon.monmap->fsid
726 << dendl;
727 return false;
728 }
729
730 for (auto p = m->entries.begin();
731 p != m->entries.end();
732 ++p) {
733 dout(10) << " logging " << *p << dendl;
734 if (!summary.contains(p->key()) &&
735 !pending_keys.count(p->key())) {
736 pending_keys.insert(p->key());
737 pending_log.insert(pair<utime_t,LogEntry>(p->stamp, *p));
738 }
739 }
740 wait_for_finished_proposal(op, new C_Log(this, op));
741 return true;
742 }
743
744 void LogMonitor::_updated_log(MonOpRequestRef op)
745 {
746 auto m = op->get_req<MLog>();
747 dout(7) << "_updated_log for " << m->get_orig_source_inst() << dendl;
748 mon.send_reply(op, new MLogAck(m->fsid, m->entries.rbegin()->seq));
749 }
750
751 bool LogMonitor::should_propose(double& delay)
752 {
753 // commit now if we have a lot of pending events
754 if (g_conf()->mon_max_log_entries_per_event > 0 &&
755 pending_log.size() >= (unsigned)g_conf()->mon_max_log_entries_per_event)
756 return true;
757
758 // otherwise fall back to generic policy
759 return PaxosService::should_propose(delay);
760 }
761
762
763 bool LogMonitor::preprocess_command(MonOpRequestRef op)
764 {
765 op->mark_logmon_event("preprocess_command");
766 auto m = op->get_req<MMonCommand>();
767 int r = -EINVAL;
768 bufferlist rdata;
769 stringstream ss;
770
771 cmdmap_t cmdmap;
772 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
773 string rs = ss.str();
774 mon.reply_command(op, -EINVAL, rs, get_last_committed());
775 return true;
776 }
777 MonSession *session = op->get_session();
778 if (!session) {
779 mon.reply_command(op, -EACCES, "access denied", get_last_committed());
780 return true;
781 }
782
783 string prefix;
784 cmd_getval(cmdmap, "prefix", prefix);
785
786 string format = cmd_getval_or<string>(cmdmap, "format", "plain");
787 boost::scoped_ptr<Formatter> f(Formatter::create(format));
788
789 if (prefix == "log last") {
790 int64_t num = 20;
791 cmd_getval(cmdmap, "num", num);
792 if (f) {
793 f->open_array_section("tail");
794 }
795
796 std::string level_str;
797 clog_type level;
798 if (cmd_getval(cmdmap, "level", level_str)) {
799 level = LogEntry::str_to_level(level_str);
800 if (level == CLOG_UNKNOWN) {
801 ss << "Invalid severity '" << level_str << "'";
802 mon.reply_command(op, -EINVAL, ss.str(), get_last_committed());
803 return true;
804 }
805 } else {
806 level = CLOG_INFO;
807 }
808
809 std::string channel;
810 if (!cmd_getval(cmdmap, "channel", channel)) {
811 channel = CLOG_CHANNEL_DEFAULT;
812 }
813
814 // We'll apply this twice, once while counting out lines
815 // and once while outputting them.
816 auto match = [level](const LogEntry &entry) {
817 return entry.prio >= level;
818 };
819
820 ostringstream ss;
821 if (!summary.tail_by_channel.empty()) {
822 // pre-quincy compat
823 // Decrement operation that sets to container end when hitting rbegin
824 if (channel == "*") {
825 list<LogEntry> full_tail;
826 summary.build_ordered_tail_legacy(&full_tail);
827 auto rp = full_tail.rbegin();
828 for (; num > 0 && rp != full_tail.rend(); ++rp) {
829 if (match(*rp)) {
830 num--;
831 }
832 }
833 if (rp == full_tail.rend()) {
834 --rp;
835 }
836
837 // Decrement a reverse iterator such that going past rbegin()
838 // sets it to rend(). This is for writing a for() loop that
839 // goes up to (and including) rbegin()
840 auto dec = [&rp, &full_tail] () {
841 if (rp == full_tail.rbegin()) {
842 rp = full_tail.rend();
843 } else {
844 --rp;
845 }
846 };
847
848 // Move forward to the end of the container (decrement the reverse
849 // iterator).
850 for (; rp != full_tail.rend(); dec()) {
851 if (!match(*rp)) {
852 continue;
853 }
854 if (f) {
855 f->dump_object("entry", *rp);
856 } else {
857 ss << *rp << "\n";
858 }
859 }
860 } else {
861 auto p = summary.tail_by_channel.find(channel);
862 if (p != summary.tail_by_channel.end()) {
863 auto rp = p->second.rbegin();
864 for (; num > 0 && rp != p->second.rend(); ++rp) {
865 if (match(rp->second)) {
866 num--;
867 }
868 }
869 if (rp == p->second.rend()) {
870 --rp;
871 }
872
873 // Decrement a reverse iterator such that going past rbegin()
874 // sets it to rend(). This is for writing a for() loop that
875 // goes up to (and including) rbegin()
876 auto dec = [&rp, &p] () {
877 if (rp == p->second.rbegin()) {
878 rp = p->second.rend();
879 } else {
880 --rp;
881 }
882 };
883
884 // Move forward to the end of the container (decrement the reverse
885 // iterator).
886 for (; rp != p->second.rend(); dec()) {
887 if (!match(rp->second)) {
888 continue;
889 }
890 if (f) {
891 f->dump_object("entry", rp->second);
892 } else {
893 ss << rp->second << "\n";
894 }
895 }
896 }
897 }
898 } else {
899 // quincy+
900 if (channel == "*") {
901 // tail all channels; we need to mix by timestamp
902 multimap<utime_t,LogEntry> entries; // merge+sort all channels by timestamp
903 for (auto& p : summary.channel_info) {
904 version_t from = p.second.first;
905 version_t to = p.second.second;
906 version_t start;
907 if (to > (version_t)num) {
908 start = std::max(to - num, from);
909 } else {
910 start = from;
911 }
912 dout(10) << __func__ << " channnel " << p.first
913 << " from " << from << " to " << to << dendl;
914 for (version_t v = start; v < to; ++v) {
915 bufferlist ebl;
916 string key;
917 generate_logentry_key(p.first, v, &key);
918 int r = mon.store->get(get_service_name(), key, ebl);
919 if (r < 0) {
920 derr << __func__ << " missing key " << key << dendl;
921 continue;
922 }
923 LogEntry le;
924 auto p = ebl.cbegin();
925 decode(le, p);
926 entries.insert(make_pair(le.stamp, le));
927 }
928 }
929 while ((int)entries.size() > num) {
930 entries.erase(entries.begin());
931 }
932 for (auto& p : entries) {
933 if (f) {
934 f->dump_object("entry", p.second);
935 } else {
936 ss << p.second << "\n";
937 }
938 }
939 } else {
940 // tail one channel
941 auto p = summary.channel_info.find(channel);
942 if (p != summary.channel_info.end()) {
943 version_t from = p->second.first;
944 version_t to = p->second.second;
945 version_t start;
946 if (to > (version_t)num) {
947 start = std::max(to - num, from);
948 } else {
949 start = from;
950 }
951 dout(10) << __func__ << " from " << from << " to " << to << dendl;
952 for (version_t v = start; v < to; ++v) {
953 bufferlist ebl;
954 string key;
955 generate_logentry_key(channel, v, &key);
956 int r = mon.store->get(get_service_name(), key, ebl);
957 if (r < 0) {
958 derr << __func__ << " missing key " << key << dendl;
959 continue;
960 }
961 LogEntry le;
962 auto p = ebl.cbegin();
963 decode(le, p);
964 if (f) {
965 f->dump_object("entry", le);
966 } else {
967 ss << le << "\n";
968 }
969 }
970 }
971 }
972 }
973 if (f) {
974 f->close_section();
975 f->flush(rdata);
976 } else {
977 rdata.append(ss.str());
978 }
979 r = 0;
980 } else {
981 return false;
982 }
983
984 string rs;
985 getline(ss, rs);
986 mon.reply_command(op, r, rs, rdata, get_last_committed());
987 return true;
988 }
989
990
991 bool LogMonitor::prepare_command(MonOpRequestRef op)
992 {
993 op->mark_logmon_event("prepare_command");
994 auto m = op->get_req<MMonCommand>();
995 stringstream ss;
996 string rs;
997 int err = -EINVAL;
998
999 cmdmap_t cmdmap;
1000 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
1001 // ss has reason for failure
1002 string rs = ss.str();
1003 mon.reply_command(op, -EINVAL, rs, get_last_committed());
1004 return true;
1005 }
1006
1007 string prefix;
1008 cmd_getval(cmdmap, "prefix", prefix);
1009
1010 MonSession *session = op->get_session();
1011 if (!session) {
1012 mon.reply_command(op, -EACCES, "access denied", get_last_committed());
1013 return true;
1014 }
1015
1016 if (prefix == "log") {
1017 vector<string> logtext;
1018 cmd_getval(cmdmap, "logtext", logtext);
1019 LogEntry le;
1020 le.rank = m->get_orig_source();
1021 le.addrs.v.push_back(m->get_orig_source_addr());
1022 le.name = session->entity_name;
1023 le.stamp = m->get_recv_stamp();
1024 le.seq = 0;
1025 string level_str = cmd_getval_or<string>(cmdmap, "level", "info");
1026 le.prio = LogEntry::str_to_level(level_str);
1027 le.channel = CLOG_CHANNEL_DEFAULT;
1028 le.msg = str_join(logtext, " ");
1029 pending_keys.insert(le.key());
1030 pending_log.insert(pair<utime_t,LogEntry>(le.stamp, le));
1031 wait_for_finished_proposal(op, new Monitor::C_Command(
1032 mon, op, 0, string(), get_last_committed() + 1));
1033 return true;
1034 }
1035
1036 getline(ss, rs);
1037 mon.reply_command(op, err, rs, get_last_committed());
1038 return false;
1039 }
1040
1041
1042 int LogMonitor::sub_name_to_id(const string& n)
1043 {
1044 if (n.substr(0, 4) == "log-" && n.size() > 4) {
1045 return LogEntry::str_to_level(n.substr(4));
1046 } else {
1047 return CLOG_UNKNOWN;
1048 }
1049 }
1050
1051 void LogMonitor::check_subs()
1052 {
1053 dout(10) << __func__ << dendl;
1054 for (map<string, xlist<Subscription*>*>::iterator i = mon.session_map.subs.begin();
1055 i != mon.session_map.subs.end();
1056 ++i) {
1057 for (xlist<Subscription*>::iterator j = i->second->begin(); !j.end(); ++j) {
1058 if (sub_name_to_id((*j)->type) >= 0)
1059 check_sub(*j);
1060 }
1061 }
1062 }
1063
1064 void LogMonitor::check_sub(Subscription *s)
1065 {
1066 dout(10) << __func__ << " client wants " << s->type << " ver " << s->next << dendl;
1067
1068 int sub_level = sub_name_to_id(s->type);
1069 ceph_assert(sub_level >= 0);
1070
1071 version_t summary_version = summary.version;
1072 if (s->next > summary_version) {
1073 dout(10) << __func__ << " client " << s->session->name
1074 << " requested version (" << s->next << ") is greater than ours ("
1075 << summary_version << "), which means we already sent him"
1076 << " everything we have." << dendl;
1077 return;
1078 }
1079
1080 MLog *mlog = new MLog(mon.monmap->fsid);
1081
1082 if (s->next == 0) {
1083 /* First timer, heh? */
1084 _create_sub_incremental(mlog, sub_level, get_last_committed());
1085 } else {
1086 /* let us send you an incremental log... */
1087 _create_sub_incremental(mlog, sub_level, s->next);
1088 }
1089
1090 dout(10) << __func__ << " sending message to " << s->session->name
1091 << " with " << mlog->entries.size() << " entries"
1092 << " (version " << mlog->version << ")" << dendl;
1093
1094 if (!mlog->entries.empty()) {
1095 s->session->con->send_message(mlog);
1096 } else {
1097 mlog->put();
1098 }
1099 if (s->onetime)
1100 mon.session_map.remove_sub(s);
1101 else
1102 s->next = summary_version+1;
1103 }
1104
1105 /**
1106 * Create an incremental log message from version \p sv to \p summary.version
1107 *
1108 * @param mlog Log message we'll send to the client with the messages received
1109 * since version \p sv, inclusive.
1110 * @param level The max log level of the messages the client is interested in.
1111 * @param sv The version the client is looking for.
1112 */
1113 void LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv)
1114 {
1115 dout(10) << __func__ << " level " << level << " ver " << sv
1116 << " cur summary ver " << summary.version << dendl;
1117
1118 if (sv < get_first_committed()) {
1119 dout(10) << __func__ << " skipped from " << sv
1120 << " to first_committed " << get_first_committed() << dendl;
1121 LogEntry le;
1122 le.stamp = ceph_clock_now();
1123 le.prio = CLOG_WARN;
1124 ostringstream ss;
1125 ss << "skipped log messages from " << sv << " to " << get_first_committed();
1126 le.msg = ss.str();
1127 mlog->entries.push_back(le);
1128 sv = get_first_committed();
1129 }
1130
1131 version_t summary_ver = summary.version;
1132 while (sv && sv <= summary_ver) {
1133 bufferlist bl;
1134 int err = get_version(sv, bl);
1135 ceph_assert(err == 0);
1136 ceph_assert(bl.length());
1137 auto p = bl.cbegin();
1138 __u8 v;
1139 decode(v, p);
1140 int32_t num = -2;
1141 if (v >= 2) {
1142 decode(num, p);
1143 dout(20) << __func__ << " sv " << sv << " has " << num << " entries" << dendl;
1144 }
1145 while ((num == -2 && !p.end()) || (num >= 0 && num--)) {
1146 LogEntry le;
1147 le.decode(p);
1148 if (le.prio < level) {
1149 dout(20) << __func__ << " requested " << level
1150 << ", skipping " << le << dendl;
1151 continue;
1152 }
1153 mlog->entries.push_back(le);
1154 }
1155 mlog->version = sv++;
1156 }
1157
1158 dout(10) << __func__ << " incremental message ready ("
1159 << mlog->entries.size() << " entries)" << dendl;
1160 }
1161
1162 void LogMonitor::update_log_channels()
1163 {
1164 ostringstream oss;
1165
1166 channels.clear();
1167
1168 int r = get_conf_str_map_helper(
1169 g_conf().get_val<string>("mon_cluster_log_to_syslog"),
1170 oss, &channels.log_to_syslog,
1171 CLOG_CONFIG_DEFAULT_KEY);
1172 if (r < 0) {
1173 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog'" << dendl;
1174 return;
1175 }
1176
1177 r = get_conf_str_map_helper(
1178 g_conf().get_val<string>("mon_cluster_log_to_syslog_level"),
1179 oss, &channels.syslog_level,
1180 CLOG_CONFIG_DEFAULT_KEY);
1181 if (r < 0) {
1182 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_level'"
1183 << dendl;
1184 return;
1185 }
1186
1187 r = get_conf_str_map_helper(
1188 g_conf().get_val<string>("mon_cluster_log_to_syslog_facility"),
1189 oss, &channels.syslog_facility,
1190 CLOG_CONFIG_DEFAULT_KEY);
1191 if (r < 0) {
1192 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_facility'"
1193 << dendl;
1194 return;
1195 }
1196
1197 r = get_conf_str_map_helper(
1198 g_conf().get_val<string>("mon_cluster_log_file"), oss,
1199 &channels.log_file,
1200 CLOG_CONFIG_DEFAULT_KEY);
1201 if (r < 0) {
1202 derr << __func__ << " error parsing 'mon_cluster_log_file'" << dendl;
1203 return;
1204 }
1205
1206 r = get_conf_str_map_helper(
1207 g_conf().get_val<string>("mon_cluster_log_file_level"), oss,
1208 &channels.log_file_level,
1209 CLOG_CONFIG_DEFAULT_KEY);
1210 if (r < 0) {
1211 derr << __func__ << " error parsing 'mon_cluster_log_file_level'"
1212 << dendl;
1213 return;
1214 }
1215
1216 r = get_conf_str_map_helper(
1217 g_conf().get_val<string>("mon_cluster_log_to_graylog"), oss,
1218 &channels.log_to_graylog,
1219 CLOG_CONFIG_DEFAULT_KEY);
1220 if (r < 0) {
1221 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog'"
1222 << dendl;
1223 return;
1224 }
1225
1226 r = get_conf_str_map_helper(
1227 g_conf().get_val<string>("mon_cluster_log_to_graylog_host"), oss,
1228 &channels.log_to_graylog_host,
1229 CLOG_CONFIG_DEFAULT_KEY);
1230 if (r < 0) {
1231 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_host'"
1232 << dendl;
1233 return;
1234 }
1235
1236 r = get_conf_str_map_helper(
1237 g_conf().get_val<string>("mon_cluster_log_to_graylog_port"), oss,
1238 &channels.log_to_graylog_port,
1239 CLOG_CONFIG_DEFAULT_KEY);
1240 if (r < 0) {
1241 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_port'"
1242 << dendl;
1243 return;
1244 }
1245
1246 r = get_conf_str_map_helper(
1247 g_conf().get_val<string>("mon_cluster_log_to_journald"), oss,
1248 &channels.log_to_journald,
1249 CLOG_CONFIG_DEFAULT_KEY);
1250 if (r < 0) {
1251 derr << __func__ << " error parsing 'mon_cluster_log_to_journald'"
1252 << dendl;
1253 return;
1254 }
1255
1256 channels.expand_channel_meta();
1257 log_external_close_fds();
1258 }
1259
1260
1261 void LogMonitor::handle_conf_change(const ConfigProxy& conf,
1262 const std::set<std::string> &changed)
1263 {
1264 if (changed.count("mon_cluster_log_to_syslog") ||
1265 changed.count("mon_cluster_log_to_syslog_level") ||
1266 changed.count("mon_cluster_log_to_syslog_facility") ||
1267 changed.count("mon_cluster_log_file") ||
1268 changed.count("mon_cluster_log_file_level") ||
1269 changed.count("mon_cluster_log_to_graylog") ||
1270 changed.count("mon_cluster_log_to_graylog_host") ||
1271 changed.count("mon_cluster_log_to_graylog_port") ||
1272 changed.count("mon_cluster_log_to_journald") ||
1273 changed.count("mon_cluster_log_to_file")) {
1274 update_log_channels();
1275 }
1276 }