]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/LogMonitor.cc
80e069d593af0f37132ac58eeaef07a77582b50f
[ceph.git] / ceph / src / mon / LogMonitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 /*
17
18 -- Storage scheme --
19
20 Pre-quincy:
21
22 - LogSummary contains last N entries for every channel
23 - LogSummary (as "full") written on every commit
24 - LogSummary contains "keys" which LogEntryKey hash_set for the
25 same set of entries (for deduping)
26
27 Quincy+:
28
29 - LogSummary contains, for each channel,
30 - start seq
31 - end seq (last written seq + 1)
32 - LogSummary contains an LRUSet for tracking dups
33 - LogSummary written every N commits
34 - each LogEntry written in a separate key
35 - "%s/%08x" % (channel, seq) -> LogEntry
36 - per-commit record includes channel -> begin (trim bounds)
37 - 'external_log_to' meta records version to which we have logged externally
38
39 */
40
41
42
43 #include <boost/algorithm/string/predicate.hpp>
44
45 #include <sstream>
46 #include <syslog.h>
47
48 #include "LogMonitor.h"
49 #include "Monitor.h"
50 #include "MonitorDBStore.h"
51
52 #include "messages/MMonCommand.h"
53 #include "messages/MLog.h"
54 #include "messages/MLogAck.h"
55 #include "common/Graylog.h"
56 #include "common/Journald.h"
57 #include "common/errno.h"
58 #include "common/strtol.h"
59 #include "include/ceph_assert.h"
60 #include "include/str_list.h"
61 #include "include/str_map.h"
62 #include "include/compat.h"
63
64 #define dout_subsys ceph_subsys_mon
65
66 using namespace TOPNSPC::common;
67
68 using std::cerr;
69 using std::cout;
70 using std::dec;
71 using std::hex;
72 using std::list;
73 using std::map;
74 using std::make_pair;
75 using std::multimap;
76 using std::ostream;
77 using std::ostringstream;
78 using std::pair;
79 using std::set;
80 using std::setfill;
81 using std::string;
82 using std::stringstream;
83 using std::to_string;
84 using std::vector;
85 using std::unique_ptr;
86
87 using ceph::bufferlist;
88 using ceph::decode;
89 using ceph::encode;
90 using ceph::Formatter;
91 using ceph::JSONFormatter;
92 using ceph::make_message;
93 using ceph::mono_clock;
94 using ceph::mono_time;
95 using ceph::timespan_str;
96
97 string LogMonitor::log_channel_info::get_log_file(const string &channel)
98 {
99 dout(25) << __func__ << " for channel '"
100 << channel << "'" << dendl;
101
102 if (expanded_log_file.count(channel) == 0) {
103 string fname = expand_channel_meta(
104 get_str_map_key(log_file, channel, &CLOG_CONFIG_DEFAULT_KEY),
105 channel);
106 expanded_log_file[channel] = fname;
107
108 dout(20) << __func__ << " for channel '"
109 << channel << "' expanded to '"
110 << fname << "'" << dendl;
111 }
112 return expanded_log_file[channel];
113 }
114
115
116 void LogMonitor::log_channel_info::expand_channel_meta(map<string,string> &m)
117 {
118 dout(20) << __func__ << " expand map: " << m << dendl;
119 for (map<string,string>::iterator p = m.begin(); p != m.end(); ++p) {
120 m[p->first] = expand_channel_meta(p->second, p->first);
121 }
122 dout(20) << __func__ << " expanded map: " << m << dendl;
123 }
124
125 string LogMonitor::log_channel_info::expand_channel_meta(
126 const string &input,
127 const string &change_to)
128 {
129 size_t pos = string::npos;
130 string s(input);
131 while ((pos = s.find(LOG_META_CHANNEL)) != string::npos) {
132 string tmp = s.substr(0, pos) + change_to;
133 if (pos+LOG_META_CHANNEL.length() < s.length())
134 tmp += s.substr(pos+LOG_META_CHANNEL.length());
135 s = tmp;
136 }
137 dout(20) << __func__ << " from '" << input
138 << "' to '" << s << "'" << dendl;
139
140 return s;
141 }
142
143 bool LogMonitor::log_channel_info::do_log_to_syslog(const string &channel) {
144 string v = get_str_map_key(log_to_syslog, channel,
145 &CLOG_CONFIG_DEFAULT_KEY);
146 // We expect booleans, but they are in k/v pairs, kept
147 // as strings, in 'log_to_syslog'. We must ensure
148 // compatibility with existing boolean handling, and so
149 // we are here using a modified version of how
150 // md_config_t::set_val_raw() handles booleans. We will
151 // accept both 'true' and 'false', but will also check for
152 // '1' and '0'. The main distiction between this and the
153 // original code is that we will assume everything not '1',
154 // '0', 'true' or 'false' to be 'false'.
155 bool ret = false;
156
157 if (boost::iequals(v, "false")) {
158 ret = false;
159 } else if (boost::iequals(v, "true")) {
160 ret = true;
161 } else {
162 std::string err;
163 int b = strict_strtol(v.c_str(), 10, &err);
164 ret = (err.empty() && b == 1);
165 }
166
167 return ret;
168 }
169
170 ceph::logging::Graylog::Ref LogMonitor::log_channel_info::get_graylog(
171 const string &channel)
172 {
173 dout(25) << __func__ << " for channel '"
174 << channel << "'" << dendl;
175
176 if (graylogs.count(channel) == 0) {
177 auto graylog(std::make_shared<ceph::logging::Graylog>("mon"));
178
179 graylog->set_fsid(g_conf().get_val<uuid_d>("fsid"));
180 graylog->set_hostname(g_conf()->host);
181 graylog->set_destination(get_str_map_key(log_to_graylog_host, channel,
182 &CLOG_CONFIG_DEFAULT_KEY),
183 atoi(get_str_map_key(log_to_graylog_port, channel,
184 &CLOG_CONFIG_DEFAULT_KEY).c_str()));
185
186 graylogs[channel] = graylog;
187 dout(20) << __func__ << " for channel '"
188 << channel << "' to graylog host '"
189 << log_to_graylog_host[channel] << ":"
190 << log_to_graylog_port[channel]
191 << "'" << dendl;
192 }
193 return graylogs[channel];
194 }
195
196 ceph::logging::JournaldClusterLogger &LogMonitor::log_channel_info::get_journald()
197 {
198 dout(25) << __func__ << dendl;
199
200 if (!journald) {
201 journald = std::make_unique<ceph::logging::JournaldClusterLogger>();
202 }
203 return *journald;
204 }
205
206 void LogMonitor::log_channel_info::clear()
207 {
208 log_to_syslog.clear();
209 syslog_level.clear();
210 syslog_facility.clear();
211 log_file.clear();
212 expanded_log_file.clear();
213 log_file_level.clear();
214 log_to_graylog.clear();
215 log_to_graylog_host.clear();
216 log_to_graylog_port.clear();
217 log_to_journald.clear();
218 graylogs.clear();
219 journald.reset();
220 }
221
222 LogMonitor::log_channel_info::log_channel_info() = default;
223 LogMonitor::log_channel_info::~log_channel_info() = default;
224
225
226 #undef dout_prefix
227 #define dout_prefix _prefix(_dout, mon, get_last_committed())
228 static ostream& _prefix(std::ostream *_dout, Monitor &mon, version_t v) {
229 return *_dout << "mon." << mon.name << "@" << mon.rank
230 << "(" << mon.get_state_name()
231 << ").log v" << v << " ";
232 }
233
234 ostream& operator<<(ostream &out, const LogMonitor &pm)
235 {
236 return out << "log";
237 }
238
239 /*
240 Tick function to update the map based on performance every N seconds
241 */
242
243 void LogMonitor::tick()
244 {
245 if (!is_active()) return;
246
247 dout(10) << *this << dendl;
248
249 }
250
251 void LogMonitor::create_initial()
252 {
253 dout(10) << "create_initial -- creating initial map" << dendl;
254 LogEntry e;
255 e.name = g_conf()->name;
256 e.rank = entity_name_t::MON(mon.rank);
257 e.addrs = mon.messenger->get_myaddrs();
258 e.stamp = ceph_clock_now();
259 e.prio = CLOG_INFO;
260 e.channel = CLOG_CHANNEL_CLUSTER;
261 std::stringstream ss;
262 ss << "mkfs " << mon.monmap->get_fsid();
263 e.msg = ss.str();
264 e.seq = 0;
265 pending_log.insert(pair<utime_t,LogEntry>(e.stamp, e));
266 }
267
268 void LogMonitor::update_from_paxos(bool *need_bootstrap)
269 {
270 dout(10) << __func__ << dendl;
271 version_t version = get_last_committed();
272 dout(10) << __func__ << " version " << version
273 << " summary v " << summary.version << dendl;
274
275 log_external_backlog();
276
277 if (version == summary.version)
278 return;
279 ceph_assert(version >= summary.version);
280
281 version_t latest_full = get_version_latest_full();
282 dout(10) << __func__ << " latest full " << latest_full << dendl;
283 if ((latest_full > 0) && (latest_full > summary.version)) {
284 bufferlist latest_bl;
285 get_version_full(latest_full, latest_bl);
286 ceph_assert(latest_bl.length() != 0);
287 dout(7) << __func__ << " loading summary e" << latest_full << dendl;
288 auto p = latest_bl.cbegin();
289 decode(summary, p);
290 dout(7) << __func__ << " loaded summary e" << summary.version << dendl;
291 }
292
293 // walk through incrementals
294 while (version > summary.version) {
295 bufferlist bl;
296 int err = get_version(summary.version+1, bl);
297 ceph_assert(err == 0);
298 ceph_assert(bl.length());
299
300 auto p = bl.cbegin();
301 __u8 struct_v;
302 decode(struct_v, p);
303 if (struct_v == 1) {
304 // legacy pre-quincy commits
305 while (!p.end()) {
306 LogEntry le;
307 le.decode(p);
308 dout(7) << "update_from_paxos applying incremental log "
309 << summary.version+1 << " " << le << dendl;
310 summary.add_legacy(le);
311 }
312 } else {
313 uint32_t num;
314 decode(num, p);
315 while (num--) {
316 LogEntry le;
317 le.decode(p);
318 dout(7) << "update_from_paxos applying incremental log "
319 << summary.version+1 << " " << le << dendl;
320 summary.recent_keys.insert(le.key());
321 summary.channel_info[le.channel].second++;
322 // we may have logged past the (persisted) summary in a prior quorum
323 if (version > external_log_to) {
324 log_external(le);
325 }
326 }
327 map<string,version_t> prune_channels_to;
328 decode(prune_channels_to, p);
329 for (auto& [channel, prune_to] : prune_channels_to) {
330 dout(20) << __func__ << " channel " << channel
331 << " pruned to " << prune_to << dendl;
332 summary.channel_info[channel].first = prune_to;
333 }
334 // zero out pre-quincy fields (encode_pending needs this to reliably detect
335 // upgrade)
336 summary.tail_by_channel.clear();
337 summary.keys.clear();
338 }
339
340 summary.version++;
341 summary.prune(g_conf()->mon_log_max_summary);
342 }
343 dout(10) << " summary.channel_info " << summary.channel_info << dendl;
344 external_log_to = version;
345 mon.store->write_meta("external_log_to", stringify(external_log_to));
346
347 check_subs();
348 }
349
350 void LogMonitor::log_external(const LogEntry& le)
351 {
352 string channel = le.channel;
353 if (channel.empty()) { // keep retrocompatibility
354 channel = CLOG_CHANNEL_CLUSTER;
355 }
356
357 if (g_conf().get_val<bool>("mon_cluster_log_to_stderr")) {
358 cerr << channel << " " << le << std::endl;
359 }
360
361 if (channels.do_log_to_syslog(channel)) {
362 string level = channels.get_level(channel);
363 string facility = channels.get_facility(channel);
364 if (level.empty() || facility.empty()) {
365 derr << __func__ << " unable to log to syslog -- level or facility"
366 << " not defined (level: " << level << ", facility: "
367 << facility << ")" << dendl;
368 } else {
369 le.log_to_syslog(channels.get_level(channel),
370 channels.get_facility(channel));
371 }
372 }
373
374 if (channels.do_log_to_graylog(channel)) {
375 ceph::logging::Graylog::Ref graylog = channels.get_graylog(channel);
376 if (graylog) {
377 graylog->log_log_entry(&le);
378 }
379 dout(7) << "graylog: " << channel << " " << graylog
380 << " host:" << channels.log_to_graylog_host << dendl;
381 }
382
383 if (channels.do_log_to_journald(channel)) {
384 auto &journald = channels.get_journald();
385 journald.log_log_entry(le);
386 dout(7) << "journald: " << channel << dendl;
387 }
388
389 if (g_conf()->mon_cluster_log_to_file) {
390 if (this->log_rotated.exchange(false)) {
391 this->log_external_close_fds();
392 }
393
394 auto p = channel_fds.find(channel);
395 int fd;
396 if (p == channel_fds.end()) {
397 string log_file = channels.get_log_file(channel);
398 dout(20) << __func__ << " logging for channel '" << channel
399 << "' to file '" << log_file << "'" << dendl;
400 if (log_file.empty()) {
401 // do not log this channel
402 fd = -1;
403 } else {
404 fd = ::open(log_file.c_str(), O_WRONLY|O_APPEND|O_CREAT|O_CLOEXEC, 0600);
405 if (fd < 0) {
406 int err = -errno;
407 dout(1) << "unable to write to '" << log_file << "' for channel '"
408 << channel << "': " << cpp_strerror(err) << dendl;
409 } else {
410 channel_fds[channel] = fd;
411 }
412 }
413 } else {
414 fd = p->second;
415 }
416
417 if (fd >= 0) {
418 fmt::format_to(std::back_inserter(file_log_buffer), "{}\n", le);
419 int err = safe_write(fd, file_log_buffer.data(), file_log_buffer.size());
420 file_log_buffer.clear();
421 if (err < 0) {
422 dout(1) << "error writing to '" << channels.get_log_file(channel)
423 << "' for channel '" << channel
424 << ": " << cpp_strerror(err) << dendl;
425 ::close(fd);
426 channel_fds.erase(channel);
427 }
428 }
429 }
430 }
431
432 void LogMonitor::log_external_close_fds()
433 {
434 for (auto& [channel, fd] : channel_fds) {
435 if (fd >= 0) {
436 dout(10) << __func__ << " closing " << channel << " (" << fd << ")" << dendl;
437 ::close(fd);
438 }
439 }
440 channel_fds.clear();
441 }
442
443 /// catch external logs up to summary.version
444 void LogMonitor::log_external_backlog()
445 {
446 if (!external_log_to) {
447 std::string cur_str;
448 int r = mon.store->read_meta("external_log_to", &cur_str);
449 if (r == 0) {
450 external_log_to = std::stoull(cur_str);
451 dout(10) << __func__ << " initialized external_log_to = " << external_log_to
452 << " (recorded log_to position)" << dendl;
453 } else {
454 // pre-quincy, we assumed that anything through summary.version was
455 // logged externally.
456 assert(r == -ENOENT);
457 external_log_to = summary.version;
458 dout(10) << __func__ << " initialized external_log_to = " << external_log_to
459 << " (summary v " << summary.version << ")" << dendl;
460 }
461 }
462 // we may have logged ahead of summary.version, but never ahead of paxos
463 if (external_log_to > get_last_committed()) {
464 derr << __func__ << " rewinding external_log_to from " << external_log_to
465 << " -> " << get_last_committed() << " (sync_force? mon rebuild?)" << dendl;
466 external_log_to = get_last_committed();
467 }
468 if (external_log_to >= summary.version) {
469 return;
470 }
471 if (auto first = get_first_committed(); external_log_to < first) {
472 derr << __func__ << " local logs at " << external_log_to
473 << ", skipping to " << first << dendl;
474 external_log_to = first;
475 // FIXME: write marker in each channel log file?
476 }
477 for (; external_log_to < summary.version; ++external_log_to) {
478 bufferlist bl;
479 int err = get_version(external_log_to+1, bl);
480 ceph_assert(err == 0);
481 ceph_assert(bl.length());
482 auto p = bl.cbegin();
483 __u8 v;
484 decode(v, p);
485 int32_t num = -2;
486 if (v >= 2) {
487 decode(num, p);
488 }
489 while ((num == -2 && !p.end()) || (num >= 0 && num--)) {
490 LogEntry le;
491 le.decode(p);
492 log_external(le);
493 }
494 }
495 mon.store->write_meta("external_log_to", stringify(external_log_to));
496 }
497
498 void LogMonitor::create_pending()
499 {
500 pending_log.clear();
501 pending_keys.clear();
502 dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl;
503 }
504
505 void LogMonitor::generate_logentry_key(
506 const std::string& channel,
507 version_t v,
508 std::string *out)
509 {
510 out->append(channel);
511 out->append("/");
512 char vs[10];
513 snprintf(vs, sizeof(vs), "%08llx", (unsigned long long)v);
514 out->append(vs);
515 }
516
517 void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t)
518 {
519 version_t version = get_last_committed() + 1;
520 bufferlist bl;
521 dout(10) << __func__ << " v" << version << dendl;
522
523 if (mon.monmap->min_mon_release < ceph_release_t::quincy) {
524 // legacy encoding for pre-quincy quorum
525 __u8 struct_v = 1;
526 encode(struct_v, bl);
527 for (auto& p : pending_log) {
528 p.second.encode(bl, mon.get_quorum_con_features());
529 }
530 put_version(t, version, bl);
531 put_last_committed(t, version);
532 return;
533 }
534
535 __u8 struct_v = 2;
536 encode(struct_v, bl);
537
538 // first commit after upgrading to quincy?
539 if (!summary.tail_by_channel.empty()) {
540 // include past log entries
541 for (auto& p : summary.tail_by_channel) {
542 for (auto& q : p.second) {
543 pending_log.emplace(make_pair(q.second.stamp, q.second));
544 }
545 }
546 }
547
548 // record new entries
549 auto pending_channel_info = summary.channel_info;
550 uint32_t num = pending_log.size();
551 encode(num, bl);
552 dout(20) << __func__ << " writing " << num << " entries" << dendl;
553 for (auto& p : pending_log) {
554 bufferlist ebl;
555 p.second.encode(ebl, mon.get_quorum_con_features());
556
557 auto& bounds = pending_channel_info[p.second.channel];
558 version_t v = bounds.second++;
559 std::string key;
560 generate_logentry_key(p.second.channel, v, &key);
561 t->put(get_service_name(), key, ebl);
562
563 bl.claim_append(ebl);
564 }
565
566 // prune log entries?
567 map<string,version_t> prune_channels_to;
568 for (auto& [channel, info] : summary.channel_info) {
569 if (info.second - info.first > g_conf()->mon_log_max) {
570 const version_t from = info.first;
571 const version_t to = info.second - g_conf()->mon_log_max;
572 dout(10) << __func__ << " pruning channel " << channel
573 << " " << from << " -> " << to << dendl;
574 prune_channels_to[channel] = to;
575 pending_channel_info[channel].first = to;
576 for (version_t v = from; v < to; ++v) {
577 std::string key;
578 generate_logentry_key(channel, v, &key);
579 t->erase(get_service_name(), key);
580 }
581 }
582 }
583 dout(20) << __func__ << " prune_channels_to " << prune_channels_to << dendl;
584 encode(prune_channels_to, bl);
585
586 put_version(t, version, bl);
587 put_last_committed(t, version);
588 }
589
590 bool LogMonitor::should_stash_full()
591 {
592 if (mon.monmap->min_mon_release < ceph_release_t::quincy) {
593 // commit a LogSummary on every commit
594 return true;
595 }
596
597 // store periodic summary
598 auto period = std::min<uint64_t>(
599 g_conf()->mon_log_full_interval,
600 g_conf()->mon_max_log_epochs
601 );
602 return (get_last_committed() - get_version_latest_full() > period);
603 }
604
605
606 void LogMonitor::encode_full(MonitorDBStore::TransactionRef t)
607 {
608 dout(10) << __func__ << " log v " << summary.version << dendl;
609 ceph_assert(get_last_committed() == summary.version);
610
611 bufferlist summary_bl;
612 encode(summary, summary_bl, mon.get_quorum_con_features());
613
614 put_version_full(t, summary.version, summary_bl);
615 put_version_latest_full(t, summary.version);
616 }
617
618 version_t LogMonitor::get_trim_to() const
619 {
620 if (!mon.is_leader())
621 return 0;
622
623 unsigned max = g_conf()->mon_max_log_epochs;
624 version_t version = get_last_committed();
625 if (version > max)
626 return version - max;
627 return 0;
628 }
629
630 bool LogMonitor::preprocess_query(MonOpRequestRef op)
631 {
632 op->mark_logmon_event("preprocess_query");
633 auto m = op->get_req<PaxosServiceMessage>();
634 dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
635 switch (m->get_type()) {
636 case MSG_MON_COMMAND:
637 try {
638 return preprocess_command(op);
639 } catch (const bad_cmd_get& e) {
640 bufferlist bl;
641 mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
642 return true;
643 }
644
645 case MSG_LOG:
646 return preprocess_log(op);
647
648 default:
649 ceph_abort();
650 return true;
651 }
652 }
653
654 bool LogMonitor::prepare_update(MonOpRequestRef op)
655 {
656 op->mark_logmon_event("prepare_update");
657 auto m = op->get_req<PaxosServiceMessage>();
658 dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
659 switch (m->get_type()) {
660 case MSG_MON_COMMAND:
661 try {
662 return prepare_command(op);
663 } catch (const bad_cmd_get& e) {
664 bufferlist bl;
665 mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
666 return true;
667 }
668 case MSG_LOG:
669 return prepare_log(op);
670 default:
671 ceph_abort();
672 return false;
673 }
674 }
675
676 bool LogMonitor::preprocess_log(MonOpRequestRef op)
677 {
678 op->mark_logmon_event("preprocess_log");
679 auto m = op->get_req<MLog>();
680 dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl;
681 int num_new = 0;
682
683 MonSession *session = op->get_session();
684 if (!session)
685 goto done;
686 if (!session->is_capable("log", MON_CAP_W)) {
687 dout(0) << "preprocess_log got MLog from entity with insufficient privileges "
688 << session->caps << dendl;
689 goto done;
690 }
691
692 for (auto p = m->entries.begin();
693 p != m->entries.end();
694 ++p) {
695 if (!summary.contains(p->key()))
696 num_new++;
697 }
698 if (!num_new) {
699 dout(10) << " nothing new" << dendl;
700 goto done;
701 }
702
703 return false;
704
705 done:
706 mon.no_reply(op);
707 return true;
708 }
709
710 struct LogMonitor::C_Log : public C_MonOp {
711 LogMonitor *logmon;
712 C_Log(LogMonitor *p, MonOpRequestRef o) :
713 C_MonOp(o), logmon(p) {}
714 void _finish(int r) override {
715 if (r == -ECANCELED) {
716 return;
717 }
718 logmon->_updated_log(op);
719 }
720 };
721
722 bool LogMonitor::prepare_log(MonOpRequestRef op)
723 {
724 op->mark_logmon_event("prepare_log");
725 auto m = op->get_req<MLog>();
726 dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl;
727
728 if (m->fsid != mon.monmap->fsid) {
729 dout(0) << "handle_log on fsid " << m->fsid << " != " << mon.monmap->fsid
730 << dendl;
731 return false;
732 }
733
734 for (auto p = m->entries.begin();
735 p != m->entries.end();
736 ++p) {
737 dout(10) << " logging " << *p << dendl;
738 if (!summary.contains(p->key()) &&
739 !pending_keys.count(p->key())) {
740 pending_keys.insert(p->key());
741 pending_log.insert(pair<utime_t,LogEntry>(p->stamp, *p));
742 }
743 }
744 wait_for_finished_proposal(op, new C_Log(this, op));
745 return true;
746 }
747
748 void LogMonitor::_updated_log(MonOpRequestRef op)
749 {
750 auto m = op->get_req<MLog>();
751 dout(7) << "_updated_log for " << m->get_orig_source_inst() << dendl;
752 mon.send_reply(op, new MLogAck(m->fsid, m->entries.rbegin()->seq));
753 }
754
755 bool LogMonitor::should_propose(double& delay)
756 {
757 // commit now if we have a lot of pending events
758 if (g_conf()->mon_max_log_entries_per_event > 0 &&
759 pending_log.size() >= (unsigned)g_conf()->mon_max_log_entries_per_event)
760 return true;
761
762 // otherwise fall back to generic policy
763 return PaxosService::should_propose(delay);
764 }
765
766
767 bool LogMonitor::preprocess_command(MonOpRequestRef op)
768 {
769 op->mark_logmon_event("preprocess_command");
770 auto m = op->get_req<MMonCommand>();
771 int r = -EINVAL;
772 bufferlist rdata;
773 stringstream ss;
774
775 cmdmap_t cmdmap;
776 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
777 string rs = ss.str();
778 mon.reply_command(op, -EINVAL, rs, get_last_committed());
779 return true;
780 }
781 MonSession *session = op->get_session();
782 if (!session) {
783 mon.reply_command(op, -EACCES, "access denied", get_last_committed());
784 return true;
785 }
786
787 string prefix;
788 cmd_getval(cmdmap, "prefix", prefix);
789
790 string format = cmd_getval_or<string>(cmdmap, "format", "plain");
791 boost::scoped_ptr<Formatter> f(Formatter::create(format));
792
793 if (prefix == "log last") {
794 int64_t num = 20;
795 cmd_getval(cmdmap, "num", num);
796 if (f) {
797 f->open_array_section("tail");
798 }
799
800 std::string level_str;
801 clog_type level;
802 if (cmd_getval(cmdmap, "level", level_str)) {
803 level = LogEntry::str_to_level(level_str);
804 if (level == CLOG_UNKNOWN) {
805 ss << "Invalid severity '" << level_str << "'";
806 mon.reply_command(op, -EINVAL, ss.str(), get_last_committed());
807 return true;
808 }
809 } else {
810 level = CLOG_INFO;
811 }
812
813 std::string channel;
814 if (!cmd_getval(cmdmap, "channel", channel)) {
815 channel = CLOG_CHANNEL_DEFAULT;
816 }
817
818 // We'll apply this twice, once while counting out lines
819 // and once while outputting them.
820 auto match = [level](const LogEntry &entry) {
821 return entry.prio >= level;
822 };
823
824 ostringstream ss;
825 if (!summary.tail_by_channel.empty()) {
826 // pre-quincy compat
827 // Decrement operation that sets to container end when hitting rbegin
828 if (channel == "*") {
829 list<LogEntry> full_tail;
830 summary.build_ordered_tail_legacy(&full_tail);
831 auto rp = full_tail.rbegin();
832 for (; num > 0 && rp != full_tail.rend(); ++rp) {
833 if (match(*rp)) {
834 num--;
835 }
836 }
837 if (rp == full_tail.rend()) {
838 --rp;
839 }
840
841 // Decrement a reverse iterator such that going past rbegin()
842 // sets it to rend(). This is for writing a for() loop that
843 // goes up to (and including) rbegin()
844 auto dec = [&rp, &full_tail] () {
845 if (rp == full_tail.rbegin()) {
846 rp = full_tail.rend();
847 } else {
848 --rp;
849 }
850 };
851
852 // Move forward to the end of the container (decrement the reverse
853 // iterator).
854 for (; rp != full_tail.rend(); dec()) {
855 if (!match(*rp)) {
856 continue;
857 }
858 if (f) {
859 f->dump_object("entry", *rp);
860 } else {
861 ss << *rp << "\n";
862 }
863 }
864 } else {
865 auto p = summary.tail_by_channel.find(channel);
866 if (p != summary.tail_by_channel.end()) {
867 auto rp = p->second.rbegin();
868 for (; num > 0 && rp != p->second.rend(); ++rp) {
869 if (match(rp->second)) {
870 num--;
871 }
872 }
873 if (rp == p->second.rend()) {
874 --rp;
875 }
876
877 // Decrement a reverse iterator such that going past rbegin()
878 // sets it to rend(). This is for writing a for() loop that
879 // goes up to (and including) rbegin()
880 auto dec = [&rp, &p] () {
881 if (rp == p->second.rbegin()) {
882 rp = p->second.rend();
883 } else {
884 --rp;
885 }
886 };
887
888 // Move forward to the end of the container (decrement the reverse
889 // iterator).
890 for (; rp != p->second.rend(); dec()) {
891 if (!match(rp->second)) {
892 continue;
893 }
894 if (f) {
895 f->dump_object("entry", rp->second);
896 } else {
897 ss << rp->second << "\n";
898 }
899 }
900 }
901 }
902 } else {
903 // quincy+
904 if (channel == "*") {
905 // tail all channels; we need to mix by timestamp
906 multimap<utime_t,LogEntry> entries; // merge+sort all channels by timestamp
907 for (auto& p : summary.channel_info) {
908 version_t from = p.second.first;
909 version_t to = p.second.second;
910 version_t start;
911 if (to > (version_t)num) {
912 start = std::max(to - num, from);
913 } else {
914 start = from;
915 }
916 dout(10) << __func__ << " channnel " << p.first
917 << " from " << from << " to " << to << dendl;
918 for (version_t v = start; v < to; ++v) {
919 bufferlist ebl;
920 string key;
921 generate_logentry_key(p.first, v, &key);
922 int r = mon.store->get(get_service_name(), key, ebl);
923 if (r < 0) {
924 derr << __func__ << " missing key " << key << dendl;
925 continue;
926 }
927 LogEntry le;
928 auto p = ebl.cbegin();
929 decode(le, p);
930 entries.insert(make_pair(le.stamp, le));
931 }
932 }
933 while ((int)entries.size() > num) {
934 entries.erase(entries.begin());
935 }
936 for (auto& p : entries) {
937 if (f) {
938 f->dump_object("entry", p.second);
939 } else {
940 ss << p.second << "\n";
941 }
942 }
943 } else {
944 // tail one channel
945 auto p = summary.channel_info.find(channel);
946 if (p != summary.channel_info.end()) {
947 version_t from = p->second.first;
948 version_t to = p->second.second;
949 version_t start;
950 if (to > (version_t)num) {
951 start = std::max(to - num, from);
952 } else {
953 start = from;
954 }
955 dout(10) << __func__ << " from " << from << " to " << to << dendl;
956 for (version_t v = start; v < to; ++v) {
957 bufferlist ebl;
958 string key;
959 generate_logentry_key(channel, v, &key);
960 int r = mon.store->get(get_service_name(), key, ebl);
961 if (r < 0) {
962 derr << __func__ << " missing key " << key << dendl;
963 continue;
964 }
965 LogEntry le;
966 auto p = ebl.cbegin();
967 decode(le, p);
968 if (f) {
969 f->dump_object("entry", le);
970 } else {
971 ss << le << "\n";
972 }
973 }
974 }
975 }
976 }
977 if (f) {
978 f->close_section();
979 f->flush(rdata);
980 } else {
981 rdata.append(ss.str());
982 }
983 r = 0;
984 } else {
985 return false;
986 }
987
988 string rs;
989 getline(ss, rs);
990 mon.reply_command(op, r, rs, rdata, get_last_committed());
991 return true;
992 }
993
994
995 bool LogMonitor::prepare_command(MonOpRequestRef op)
996 {
997 op->mark_logmon_event("prepare_command");
998 auto m = op->get_req<MMonCommand>();
999 stringstream ss;
1000 string rs;
1001 int err = -EINVAL;
1002
1003 cmdmap_t cmdmap;
1004 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
1005 // ss has reason for failure
1006 string rs = ss.str();
1007 mon.reply_command(op, -EINVAL, rs, get_last_committed());
1008 return true;
1009 }
1010
1011 string prefix;
1012 cmd_getval(cmdmap, "prefix", prefix);
1013
1014 MonSession *session = op->get_session();
1015 if (!session) {
1016 mon.reply_command(op, -EACCES, "access denied", get_last_committed());
1017 return true;
1018 }
1019
1020 if (prefix == "log") {
1021 vector<string> logtext;
1022 cmd_getval(cmdmap, "logtext", logtext);
1023 LogEntry le;
1024 le.rank = m->get_orig_source();
1025 le.addrs.v.push_back(m->get_orig_source_addr());
1026 le.name = session->entity_name;
1027 le.stamp = m->get_recv_stamp();
1028 le.seq = 0;
1029 string level_str = cmd_getval_or<string>(cmdmap, "level", "info");
1030 le.prio = LogEntry::str_to_level(level_str);
1031 le.channel = CLOG_CHANNEL_DEFAULT;
1032 le.msg = str_join(logtext, " ");
1033 pending_keys.insert(le.key());
1034 pending_log.insert(pair<utime_t,LogEntry>(le.stamp, le));
1035 wait_for_finished_proposal(op, new Monitor::C_Command(
1036 mon, op, 0, string(), get_last_committed() + 1));
1037 return true;
1038 }
1039
1040 getline(ss, rs);
1041 mon.reply_command(op, err, rs, get_last_committed());
1042 return false;
1043 }
1044
1045
1046 int LogMonitor::sub_name_to_id(const string& n)
1047 {
1048 if (n.substr(0, 4) == "log-" && n.size() > 4) {
1049 return LogEntry::str_to_level(n.substr(4));
1050 } else {
1051 return CLOG_UNKNOWN;
1052 }
1053 }
1054
1055 void LogMonitor::check_subs()
1056 {
1057 dout(10) << __func__ << dendl;
1058 for (map<string, xlist<Subscription*>*>::iterator i = mon.session_map.subs.begin();
1059 i != mon.session_map.subs.end();
1060 ++i) {
1061 for (xlist<Subscription*>::iterator j = i->second->begin(); !j.end(); ++j) {
1062 if (sub_name_to_id((*j)->type) >= 0)
1063 check_sub(*j);
1064 }
1065 }
1066 }
1067
1068 void LogMonitor::check_sub(Subscription *s)
1069 {
1070 dout(10) << __func__ << " client wants " << s->type << " ver " << s->next << dendl;
1071
1072 int sub_level = sub_name_to_id(s->type);
1073 ceph_assert(sub_level >= 0);
1074
1075 version_t summary_version = summary.version;
1076 if (s->next > summary_version) {
1077 dout(10) << __func__ << " client " << s->session->name
1078 << " requested version (" << s->next << ") is greater than ours ("
1079 << summary_version << "), which means we already sent him"
1080 << " everything we have." << dendl;
1081 return;
1082 }
1083
1084 MLog *mlog = new MLog(mon.monmap->fsid);
1085
1086 if (s->next == 0) {
1087 /* First timer, heh? */
1088 _create_sub_incremental(mlog, sub_level, get_last_committed());
1089 } else {
1090 /* let us send you an incremental log... */
1091 _create_sub_incremental(mlog, sub_level, s->next);
1092 }
1093
1094 dout(10) << __func__ << " sending message to " << s->session->name
1095 << " with " << mlog->entries.size() << " entries"
1096 << " (version " << mlog->version << ")" << dendl;
1097
1098 if (!mlog->entries.empty()) {
1099 s->session->con->send_message(mlog);
1100 } else {
1101 mlog->put();
1102 }
1103 if (s->onetime)
1104 mon.session_map.remove_sub(s);
1105 else
1106 s->next = summary_version+1;
1107 }
1108
1109 /**
1110 * Create an incremental log message from version \p sv to \p summary.version
1111 *
1112 * @param mlog Log message we'll send to the client with the messages received
1113 * since version \p sv, inclusive.
1114 * @param level The max log level of the messages the client is interested in.
1115 * @param sv The version the client is looking for.
1116 */
1117 void LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv)
1118 {
1119 dout(10) << __func__ << " level " << level << " ver " << sv
1120 << " cur summary ver " << summary.version << dendl;
1121
1122 if (sv < get_first_committed()) {
1123 dout(10) << __func__ << " skipped from " << sv
1124 << " to first_committed " << get_first_committed() << dendl;
1125 LogEntry le;
1126 le.stamp = ceph_clock_now();
1127 le.prio = CLOG_WARN;
1128 ostringstream ss;
1129 ss << "skipped log messages from " << sv << " to " << get_first_committed();
1130 le.msg = ss.str();
1131 mlog->entries.push_back(le);
1132 sv = get_first_committed();
1133 }
1134
1135 version_t summary_ver = summary.version;
1136 while (sv && sv <= summary_ver) {
1137 bufferlist bl;
1138 int err = get_version(sv, bl);
1139 ceph_assert(err == 0);
1140 ceph_assert(bl.length());
1141 auto p = bl.cbegin();
1142 __u8 v;
1143 decode(v, p);
1144 int32_t num = -2;
1145 if (v >= 2) {
1146 decode(num, p);
1147 dout(20) << __func__ << " sv " << sv << " has " << num << " entries" << dendl;
1148 }
1149 while ((num == -2 && !p.end()) || (num >= 0 && num--)) {
1150 LogEntry le;
1151 le.decode(p);
1152 if (le.prio < level) {
1153 dout(20) << __func__ << " requested " << level
1154 << ", skipping " << le << dendl;
1155 continue;
1156 }
1157 mlog->entries.push_back(le);
1158 }
1159 mlog->version = sv++;
1160 }
1161
1162 dout(10) << __func__ << " incremental message ready ("
1163 << mlog->entries.size() << " entries)" << dendl;
1164 }
1165
1166 void LogMonitor::update_log_channels()
1167 {
1168 ostringstream oss;
1169
1170 channels.clear();
1171
1172 int r = get_conf_str_map_helper(
1173 g_conf().get_val<string>("mon_cluster_log_to_syslog"),
1174 oss, &channels.log_to_syslog,
1175 CLOG_CONFIG_DEFAULT_KEY);
1176 if (r < 0) {
1177 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog'" << dendl;
1178 return;
1179 }
1180
1181 r = get_conf_str_map_helper(
1182 g_conf().get_val<string>("mon_cluster_log_to_syslog_level"),
1183 oss, &channels.syslog_level,
1184 CLOG_CONFIG_DEFAULT_KEY);
1185 if (r < 0) {
1186 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_level'"
1187 << dendl;
1188 return;
1189 }
1190
1191 r = get_conf_str_map_helper(
1192 g_conf().get_val<string>("mon_cluster_log_to_syslog_facility"),
1193 oss, &channels.syslog_facility,
1194 CLOG_CONFIG_DEFAULT_KEY);
1195 if (r < 0) {
1196 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_facility'"
1197 << dendl;
1198 return;
1199 }
1200
1201 r = get_conf_str_map_helper(
1202 g_conf().get_val<string>("mon_cluster_log_file"), oss,
1203 &channels.log_file,
1204 CLOG_CONFIG_DEFAULT_KEY);
1205 if (r < 0) {
1206 derr << __func__ << " error parsing 'mon_cluster_log_file'" << dendl;
1207 return;
1208 }
1209
1210 r = get_conf_str_map_helper(
1211 g_conf().get_val<string>("mon_cluster_log_file_level"), oss,
1212 &channels.log_file_level,
1213 CLOG_CONFIG_DEFAULT_KEY);
1214 if (r < 0) {
1215 derr << __func__ << " error parsing 'mon_cluster_log_file_level'"
1216 << dendl;
1217 return;
1218 }
1219
1220 r = get_conf_str_map_helper(
1221 g_conf().get_val<string>("mon_cluster_log_to_graylog"), oss,
1222 &channels.log_to_graylog,
1223 CLOG_CONFIG_DEFAULT_KEY);
1224 if (r < 0) {
1225 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog'"
1226 << dendl;
1227 return;
1228 }
1229
1230 r = get_conf_str_map_helper(
1231 g_conf().get_val<string>("mon_cluster_log_to_graylog_host"), oss,
1232 &channels.log_to_graylog_host,
1233 CLOG_CONFIG_DEFAULT_KEY);
1234 if (r < 0) {
1235 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_host'"
1236 << dendl;
1237 return;
1238 }
1239
1240 r = get_conf_str_map_helper(
1241 g_conf().get_val<string>("mon_cluster_log_to_graylog_port"), oss,
1242 &channels.log_to_graylog_port,
1243 CLOG_CONFIG_DEFAULT_KEY);
1244 if (r < 0) {
1245 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_port'"
1246 << dendl;
1247 return;
1248 }
1249
1250 r = get_conf_str_map_helper(
1251 g_conf().get_val<string>("mon_cluster_log_to_journald"), oss,
1252 &channels.log_to_journald,
1253 CLOG_CONFIG_DEFAULT_KEY);
1254 if (r < 0) {
1255 derr << __func__ << " error parsing 'mon_cluster_log_to_journald'"
1256 << dendl;
1257 return;
1258 }
1259
1260 channels.expand_channel_meta();
1261 log_external_close_fds();
1262 }
1263
1264
1265 void LogMonitor::handle_conf_change(const ConfigProxy& conf,
1266 const std::set<std::string> &changed)
1267 {
1268 if (changed.count("mon_cluster_log_to_syslog") ||
1269 changed.count("mon_cluster_log_to_syslog_level") ||
1270 changed.count("mon_cluster_log_to_syslog_facility") ||
1271 changed.count("mon_cluster_log_file") ||
1272 changed.count("mon_cluster_log_file_level") ||
1273 changed.count("mon_cluster_log_to_graylog") ||
1274 changed.count("mon_cluster_log_to_graylog_host") ||
1275 changed.count("mon_cluster_log_to_graylog_port") ||
1276 changed.count("mon_cluster_log_to_journald") ||
1277 changed.count("mon_cluster_log_to_file")) {
1278 update_log_channels();
1279 }
1280 }