]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include <boost/algorithm/string/predicate.hpp> | |
16 | ||
17 | #include <sstream> | |
18 | #include <syslog.h> | |
19 | ||
20 | #include "LogMonitor.h" | |
21 | #include "Monitor.h" | |
22 | #include "MonitorDBStore.h" | |
23 | ||
24 | #include "messages/MMonCommand.h" | |
25 | #include "messages/MLog.h" | |
26 | #include "messages/MLogAck.h" | |
27 | #include "common/Graylog.h" | |
28 | #include "common/errno.h" | |
29 | #include "common/strtol.h" | |
30 | #include "include/assert.h" | |
31 | #include "include/str_list.h" | |
32 | #include "include/str_map.h" | |
33 | #include "include/compat.h" | |
34 | ||
35 | #define dout_subsys ceph_subsys_mon | |
36 | #undef dout_prefix | |
37 | #define dout_prefix _prefix(_dout, mon, get_last_committed()) | |
38 | static ostream& _prefix(std::ostream *_dout, Monitor *mon, version_t v) { | |
39 | return *_dout << "mon." << mon->name << "@" << mon->rank | |
40 | << "(" << mon->get_state_name() | |
41 | << ").log v" << v << " "; | |
42 | } | |
43 | ||
44 | ostream& operator<<(ostream &out, const LogMonitor &pm) | |
45 | { | |
46 | return out << "log"; | |
47 | } | |
48 | ||
49 | /* | |
50 | Tick function to update the map based on performance every N seconds | |
51 | */ | |
52 | ||
53 | void LogMonitor::tick() | |
54 | { | |
55 | if (!is_active()) return; | |
56 | ||
57 | dout(10) << *this << dendl; | |
58 | ||
59 | } | |
60 | ||
61 | void LogMonitor::create_initial() | |
62 | { | |
63 | dout(10) << "create_initial -- creating initial map" << dendl; | |
64 | LogEntry e; | |
31f18b77 | 65 | e.name = g_conf->name; |
7c673cae FG |
66 | e.stamp = ceph_clock_now(); |
67 | e.prio = CLOG_INFO; | |
68 | std::stringstream ss; | |
69 | ss << "mkfs " << mon->monmap->get_fsid(); | |
70 | e.msg = ss.str(); | |
71 | e.seq = 0; | |
72 | pending_log.insert(pair<utime_t,LogEntry>(e.stamp, e)); | |
73 | } | |
74 | ||
75 | void LogMonitor::update_from_paxos(bool *need_bootstrap) | |
76 | { | |
77 | dout(10) << __func__ << dendl; | |
78 | version_t version = get_last_committed(); | |
79 | dout(10) << __func__ << " version " << version | |
80 | << " summary v " << summary.version << dendl; | |
81 | if (version == summary.version) | |
82 | return; | |
83 | assert(version >= summary.version); | |
84 | ||
85 | map<string,bufferlist> channel_blog; | |
86 | ||
87 | version_t latest_full = get_version_latest_full(); | |
88 | dout(10) << __func__ << " latest full " << latest_full << dendl; | |
89 | if ((latest_full > 0) && (latest_full > summary.version)) { | |
90 | bufferlist latest_bl; | |
91 | get_version_full(latest_full, latest_bl); | |
92 | assert(latest_bl.length() != 0); | |
93 | dout(7) << __func__ << " loading summary e" << latest_full << dendl; | |
94 | bufferlist::iterator p = latest_bl.begin(); | |
95 | ::decode(summary, p); | |
96 | dout(7) << __func__ << " loaded summary e" << summary.version << dendl; | |
97 | } | |
98 | ||
99 | // walk through incrementals | |
100 | while (version > summary.version) { | |
101 | bufferlist bl; | |
102 | int err = get_version(summary.version+1, bl); | |
103 | assert(err == 0); | |
104 | assert(bl.length()); | |
105 | ||
106 | bufferlist::iterator p = bl.begin(); | |
107 | __u8 v; | |
108 | ::decode(v, p); | |
109 | while (!p.end()) { | |
110 | LogEntry le; | |
111 | le.decode(p); | |
112 | dout(7) << "update_from_paxos applying incremental log " << summary.version+1 << " " << le << dendl; | |
113 | ||
114 | string channel = le.channel; | |
115 | if (channel.empty()) // keep retrocompatibility | |
116 | channel = CLOG_CHANNEL_CLUSTER; | |
117 | ||
b32b8144 FG |
118 | if (g_conf->get_val<bool>("mon_cluster_log_to_stderr")) { |
119 | cerr << channel << " " << le << std::endl; | |
120 | } | |
121 | ||
7c673cae FG |
122 | if (channels.do_log_to_syslog(channel)) { |
123 | string level = channels.get_level(channel); | |
124 | string facility = channels.get_facility(channel); | |
125 | if (level.empty() || facility.empty()) { | |
126 | derr << __func__ << " unable to log to syslog -- level or facility" | |
127 | << " not defined (level: " << level << ", facility: " | |
128 | << facility << ")" << dendl; | |
129 | continue; | |
130 | } | |
131 | le.log_to_syslog(channels.get_level(channel), | |
132 | channels.get_facility(channel)); | |
133 | } | |
134 | ||
135 | if (channels.do_log_to_graylog(channel)) { | |
136 | ceph::logging::Graylog::Ref graylog = channels.get_graylog(channel); | |
137 | if (graylog) { | |
138 | graylog->log_log_entry(&le); | |
139 | } | |
140 | dout(7) << "graylog: " << channel << " " << graylog | |
141 | << " host:" << channels.log_to_graylog_host << dendl; | |
142 | } | |
143 | ||
144 | string log_file = channels.get_log_file(channel); | |
145 | dout(20) << __func__ << " logging for channel '" << channel | |
146 | << "' to file '" << log_file << "'" << dendl; | |
147 | ||
148 | if (!log_file.empty()) { | |
149 | string log_file_level = channels.get_log_file_level(channel); | |
150 | if (log_file_level.empty()) { | |
151 | dout(1) << __func__ << " warning: log file level not defined for" | |
152 | << " channel '" << channel << "' yet a log file is --" | |
153 | << " will assume lowest level possible" << dendl; | |
154 | } | |
155 | ||
156 | int min = string_to_syslog_level(log_file_level); | |
157 | int l = clog_type_to_syslog_level(le.prio); | |
158 | if (l <= min) { | |
159 | stringstream ss; | |
160 | ss << le << "\n"; | |
161 | // init entry if DNE | |
162 | bufferlist &blog = channel_blog[channel]; | |
163 | blog.append(ss.str()); | |
164 | } | |
165 | } | |
166 | ||
167 | summary.add(le); | |
168 | } | |
169 | ||
170 | summary.version++; | |
31f18b77 | 171 | summary.prune(g_conf->mon_log_max_summary); |
7c673cae FG |
172 | } |
173 | ||
174 | dout(15) << __func__ << " logging for " | |
175 | << channel_blog.size() << " channels" << dendl; | |
176 | for(map<string,bufferlist>::iterator p = channel_blog.begin(); | |
177 | p != channel_blog.end(); ++p) { | |
178 | if (!p->second.length()) { | |
179 | dout(15) << __func__ << " channel '" << p->first | |
180 | << "': nothing to log" << dendl; | |
181 | continue; | |
182 | } | |
183 | ||
184 | dout(15) << __func__ << " channel '" << p->first | |
185 | << "' logging " << p->second.length() << " bytes" << dendl; | |
186 | string log_file = channels.get_log_file(p->first); | |
187 | ||
188 | int fd = ::open(log_file.c_str(), O_WRONLY|O_APPEND|O_CREAT, 0600); | |
189 | if (fd < 0) { | |
190 | int err = -errno; | |
191 | dout(1) << "unable to write to '" << log_file << "' for channel '" | |
192 | << p->first << "': " << cpp_strerror(err) << dendl; | |
193 | } else { | |
194 | int err = p->second.write_fd(fd); | |
195 | if (err < 0) { | |
196 | dout(1) << "error writing to '" << log_file << "' for channel '" | |
197 | << p->first << ": " << cpp_strerror(err) << dendl; | |
198 | } | |
199 | VOID_TEMP_FAILURE_RETRY(::close(fd)); | |
200 | } | |
201 | } | |
202 | ||
203 | check_subs(); | |
204 | } | |
205 | ||
206 | void LogMonitor::create_pending() | |
207 | { | |
208 | pending_log.clear(); | |
209 | pending_summary = summary; | |
210 | dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl; | |
211 | } | |
212 | ||
213 | void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t) | |
214 | { | |
215 | version_t version = get_last_committed() + 1; | |
216 | bufferlist bl; | |
217 | dout(10) << __func__ << " v" << version << dendl; | |
218 | __u8 v = 1; | |
219 | ::encode(v, bl); | |
220 | multimap<utime_t,LogEntry>::iterator p; | |
221 | for (p = pending_log.begin(); p != pending_log.end(); ++p) | |
222 | p->second.encode(bl, mon->get_quorum_con_features()); | |
223 | ||
224 | put_version(t, version, bl); | |
225 | put_last_committed(t, version); | |
226 | } | |
227 | ||
228 | void LogMonitor::encode_full(MonitorDBStore::TransactionRef t) | |
229 | { | |
230 | dout(10) << __func__ << " log v " << summary.version << dendl; | |
231 | assert(get_last_committed() == summary.version); | |
232 | ||
233 | bufferlist summary_bl; | |
234 | ::encode(summary, summary_bl, mon->get_quorum_con_features()); | |
235 | ||
236 | put_version_full(t, summary.version, summary_bl); | |
237 | put_version_latest_full(t, summary.version); | |
238 | } | |
239 | ||
240 | version_t LogMonitor::get_trim_to() | |
241 | { | |
242 | if (!mon->is_leader()) | |
243 | return 0; | |
244 | ||
245 | unsigned max = g_conf->mon_max_log_epochs; | |
246 | version_t version = get_last_committed(); | |
247 | if (version > max) | |
248 | return version - max; | |
249 | return 0; | |
250 | } | |
251 | ||
252 | bool LogMonitor::preprocess_query(MonOpRequestRef op) | |
253 | { | |
254 | op->mark_logmon_event("preprocess_query"); | |
255 | PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req()); | |
256 | dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl; | |
257 | switch (m->get_type()) { | |
258 | case MSG_MON_COMMAND: | |
259 | return preprocess_command(op); | |
260 | ||
261 | case MSG_LOG: | |
262 | return preprocess_log(op); | |
263 | ||
264 | default: | |
265 | ceph_abort(); | |
266 | return true; | |
267 | } | |
268 | } | |
269 | ||
270 | bool LogMonitor::prepare_update(MonOpRequestRef op) | |
271 | { | |
272 | op->mark_logmon_event("prepare_update"); | |
273 | PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req()); | |
274 | dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl; | |
275 | switch (m->get_type()) { | |
276 | case MSG_MON_COMMAND: | |
277 | return prepare_command(op); | |
278 | case MSG_LOG: | |
279 | return prepare_log(op); | |
280 | default: | |
281 | ceph_abort(); | |
282 | return false; | |
283 | } | |
284 | } | |
285 | ||
286 | bool LogMonitor::preprocess_log(MonOpRequestRef op) | |
287 | { | |
288 | op->mark_logmon_event("preprocess_log"); | |
289 | MLog *m = static_cast<MLog*>(op->get_req()); | |
290 | dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl; | |
291 | int num_new = 0; | |
292 | ||
293 | MonSession *session = m->get_session(); | |
294 | if (!session) | |
295 | goto done; | |
296 | if (!session->is_capable("log", MON_CAP_W)) { | |
297 | dout(0) << "preprocess_log got MLog from entity with insufficient privileges " | |
298 | << session->caps << dendl; | |
299 | goto done; | |
300 | } | |
301 | ||
302 | for (deque<LogEntry>::iterator p = m->entries.begin(); | |
303 | p != m->entries.end(); | |
304 | ++p) { | |
305 | if (!pending_summary.contains(p->key())) | |
306 | num_new++; | |
307 | } | |
308 | if (!num_new) { | |
309 | dout(10) << " nothing new" << dendl; | |
310 | goto done; | |
311 | } | |
312 | ||
313 | return false; | |
314 | ||
315 | done: | |
28e407b8 | 316 | mon->no_reply(op); |
7c673cae FG |
317 | return true; |
318 | } | |
319 | ||
320 | struct LogMonitor::C_Log : public C_MonOp { | |
321 | LogMonitor *logmon; | |
322 | C_Log(LogMonitor *p, MonOpRequestRef o) : | |
323 | C_MonOp(o), logmon(p) {} | |
324 | void _finish(int r) override { | |
325 | if (r == -ECANCELED) { | |
326 | return; | |
327 | } | |
328 | logmon->_updated_log(op); | |
329 | } | |
330 | }; | |
331 | ||
332 | bool LogMonitor::prepare_log(MonOpRequestRef op) | |
333 | { | |
334 | op->mark_logmon_event("prepare_log"); | |
335 | MLog *m = static_cast<MLog*>(op->get_req()); | |
336 | dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl; | |
337 | ||
338 | if (m->fsid != mon->monmap->fsid) { | |
339 | dout(0) << "handle_log on fsid " << m->fsid << " != " << mon->monmap->fsid | |
340 | << dendl; | |
341 | return false; | |
342 | } | |
343 | ||
344 | for (deque<LogEntry>::iterator p = m->entries.begin(); | |
345 | p != m->entries.end(); | |
346 | ++p) { | |
347 | dout(10) << " logging " << *p << dendl; | |
348 | if (!pending_summary.contains(p->key())) { | |
349 | pending_summary.add(*p); | |
350 | pending_log.insert(pair<utime_t,LogEntry>(p->stamp, *p)); | |
351 | } | |
352 | } | |
31f18b77 | 353 | pending_summary.prune(g_conf->mon_log_max_summary); |
7c673cae FG |
354 | wait_for_finished_proposal(op, new C_Log(this, op)); |
355 | return true; | |
356 | } | |
357 | ||
358 | void LogMonitor::_updated_log(MonOpRequestRef op) | |
359 | { | |
360 | MLog *m = static_cast<MLog*>(op->get_req()); | |
361 | dout(7) << "_updated_log for " << m->get_orig_source_inst() << dendl; | |
362 | mon->send_reply(op, new MLogAck(m->fsid, m->entries.rbegin()->seq)); | |
363 | } | |
364 | ||
365 | bool LogMonitor::should_propose(double& delay) | |
366 | { | |
367 | // commit now if we have a lot of pending events | |
368 | if (g_conf->mon_max_log_entries_per_event > 0 && | |
369 | pending_log.size() >= (unsigned)g_conf->mon_max_log_entries_per_event) | |
370 | return true; | |
371 | ||
372 | // otherwise fall back to generic policy | |
373 | return PaxosService::should_propose(delay); | |
374 | } | |
375 | ||
376 | ||
377 | bool LogMonitor::preprocess_command(MonOpRequestRef op) | |
378 | { | |
379 | op->mark_logmon_event("preprocess_command"); | |
31f18b77 FG |
380 | MMonCommand *m = static_cast<MMonCommand*>(op->get_req()); |
381 | int r = -EINVAL; | |
7c673cae FG |
382 | bufferlist rdata; |
383 | stringstream ss; | |
384 | ||
31f18b77 FG |
385 | map<string, cmd_vartype> cmdmap; |
386 | if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) { | |
387 | string rs = ss.str(); | |
388 | mon->reply_command(op, -EINVAL, rs, get_last_committed()); | |
389 | return true; | |
390 | } | |
391 | MonSession *session = m->get_session(); | |
392 | if (!session) { | |
393 | mon->reply_command(op, -EACCES, "access denied", get_last_committed()); | |
7c673cae | 394 | return true; |
31f18b77 FG |
395 | } |
396 | ||
397 | string prefix; | |
398 | cmd_getval(g_ceph_context, cmdmap, "prefix", prefix); | |
399 | ||
400 | string format; | |
401 | cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain")); | |
402 | boost::scoped_ptr<Formatter> f(Formatter::create(format)); | |
403 | ||
404 | if (prefix == "log last") { | |
405 | int64_t num = 20; | |
406 | cmd_getval(g_ceph_context, cmdmap, "num", num); | |
407 | if (f) { | |
408 | f->open_array_section("tail"); | |
409 | } | |
224ce89b WB |
410 | |
411 | std::string level_str; | |
412 | clog_type level; | |
413 | if (cmd_getval(g_ceph_context, cmdmap, "level", level_str)) { | |
414 | level = LogEntry::str_to_level(level_str); | |
415 | if (level == CLOG_UNKNOWN) { | |
416 | ss << "Invalid severity '" << level_str << "'"; | |
417 | mon->reply_command(op, -EINVAL, ss.str(), get_last_committed()); | |
418 | return true; | |
419 | } | |
420 | } else { | |
421 | level = CLOG_INFO; | |
422 | } | |
423 | ||
424 | std::string channel; | |
425 | if (!cmd_getval(g_ceph_context, cmdmap, "channel", channel)) { | |
426 | channel = CLOG_CHANNEL_DEFAULT; | |
427 | } | |
428 | ||
429 | // We'll apply this twice, once while counting out lines | |
430 | // and once while outputting them. | |
431 | auto match = [level, channel](const LogEntry &entry) { | |
432 | return entry.prio >= level && (entry.channel == channel || channel == "*"); | |
433 | }; | |
434 | ||
c07f9fc5 | 435 | auto rp = summary.tail.rbegin(); |
3efd9988 | 436 | for (; num > 0 && rp != summary.tail.rend(); ++rp) { |
c07f9fc5 | 437 | if (match(*rp)) { |
224ce89b WB |
438 | num--; |
439 | } | |
3efd9988 FG |
440 | } |
441 | if (rp == summary.tail.rend()) { | |
442 | --rp; | |
31f18b77 FG |
443 | } |
444 | ostringstream ss; | |
3efd9988 FG |
445 | for (; rp != summary.tail.rbegin(); --rp) { |
446 | if (!match(*rp)) { | |
224ce89b WB |
447 | continue; |
448 | } | |
449 | ||
31f18b77 | 450 | if (f) { |
3efd9988 | 451 | f->dump_object("entry", *rp); |
31f18b77 | 452 | } else { |
3efd9988 | 453 | ss << *rp << "\n"; |
31f18b77 FG |
454 | } |
455 | } | |
456 | if (f) { | |
457 | f->close_section(); | |
458 | f->flush(rdata); | |
459 | } else { | |
460 | rdata.append(ss.str()); | |
461 | } | |
462 | r = 0; | |
463 | } else { | |
7c673cae | 464 | return false; |
31f18b77 FG |
465 | } |
466 | ||
467 | string rs; | |
468 | getline(ss, rs); | |
469 | mon->reply_command(op, r, rs, rdata, get_last_committed()); | |
470 | return true; | |
7c673cae FG |
471 | } |
472 | ||
473 | ||
474 | bool LogMonitor::prepare_command(MonOpRequestRef op) | |
475 | { | |
476 | op->mark_logmon_event("prepare_command"); | |
477 | MMonCommand *m = static_cast<MMonCommand*>(op->get_req()); | |
478 | stringstream ss; | |
479 | string rs; | |
480 | int err = -EINVAL; | |
481 | ||
482 | map<string, cmd_vartype> cmdmap; | |
483 | if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) { | |
484 | // ss has reason for failure | |
485 | string rs = ss.str(); | |
486 | mon->reply_command(op, -EINVAL, rs, get_last_committed()); | |
487 | return true; | |
488 | } | |
489 | ||
490 | string prefix; | |
491 | cmd_getval(g_ceph_context, cmdmap, "prefix", prefix); | |
492 | ||
493 | MonSession *session = m->get_session(); | |
494 | if (!session) { | |
495 | mon->reply_command(op, -EACCES, "access denied", get_last_committed()); | |
496 | return true; | |
497 | } | |
498 | ||
499 | if (prefix == "log") { | |
500 | vector<string> logtext; | |
501 | cmd_getval(g_ceph_context, cmdmap, "logtext", logtext); | |
502 | LogEntry le; | |
503 | le.who = m->get_orig_source_inst(); | |
31f18b77 | 504 | le.name = session->entity_name; |
7c673cae FG |
505 | le.stamp = m->get_recv_stamp(); |
506 | le.seq = 0; | |
507 | le.prio = CLOG_INFO; | |
224ce89b | 508 | le.channel = CLOG_CHANNEL_DEFAULT; |
7c673cae FG |
509 | le.msg = str_join(logtext, " "); |
510 | pending_summary.add(le); | |
31f18b77 | 511 | pending_summary.prune(g_conf->mon_log_max_summary); |
7c673cae FG |
512 | pending_log.insert(pair<utime_t,LogEntry>(le.stamp, le)); |
513 | wait_for_finished_proposal(op, new Monitor::C_Command( | |
514 | mon, op, 0, string(), get_last_committed() + 1)); | |
515 | return true; | |
516 | } | |
517 | ||
518 | getline(ss, rs); | |
519 | mon->reply_command(op, err, rs, get_last_committed()); | |
520 | return false; | |
521 | } | |
522 | ||
523 | ||
524 | int LogMonitor::sub_name_to_id(const string& n) | |
525 | { | |
224ce89b WB |
526 | if (n.substr(0, 4) == "log-" && n.size() > 4) { |
527 | return LogEntry::str_to_level(n.substr(4)); | |
528 | } else { | |
529 | return CLOG_UNKNOWN; | |
530 | } | |
7c673cae FG |
531 | } |
532 | ||
533 | void LogMonitor::check_subs() | |
534 | { | |
535 | dout(10) << __func__ << dendl; | |
536 | for (map<string, xlist<Subscription*>*>::iterator i = mon->session_map.subs.begin(); | |
537 | i != mon->session_map.subs.end(); | |
538 | ++i) { | |
539 | for (xlist<Subscription*>::iterator j = i->second->begin(); !j.end(); ++j) { | |
540 | if (sub_name_to_id((*j)->type) >= 0) | |
541 | check_sub(*j); | |
542 | } | |
543 | } | |
544 | } | |
545 | ||
546 | void LogMonitor::check_sub(Subscription *s) | |
547 | { | |
548 | dout(10) << __func__ << " client wants " << s->type << " ver " << s->next << dendl; | |
549 | ||
550 | int sub_level = sub_name_to_id(s->type); | |
551 | assert(sub_level >= 0); | |
552 | ||
553 | version_t summary_version = summary.version; | |
554 | if (s->next > summary_version) { | |
555 | dout(10) << __func__ << " client " << s->session->inst | |
556 | << " requested version (" << s->next << ") is greater than ours (" | |
557 | << summary_version << "), which means we already sent him" | |
558 | << " everything we have." << dendl; | |
559 | return; | |
560 | } | |
561 | ||
562 | MLog *mlog = new MLog(mon->monmap->fsid); | |
563 | ||
564 | if (s->next == 0) { | |
565 | /* First timer, heh? */ | |
566 | bool ret = _create_sub_summary(mlog, sub_level); | |
567 | if (!ret) { | |
568 | dout(1) << __func__ << " ret = " << ret << dendl; | |
569 | mlog->put(); | |
570 | return; | |
571 | } | |
572 | } else { | |
573 | /* let us send you an incremental log... */ | |
574 | _create_sub_incremental(mlog, sub_level, s->next); | |
575 | } | |
576 | ||
577 | dout(1) << __func__ << " sending message to " << s->session->inst | |
578 | << " with " << mlog->entries.size() << " entries" | |
579 | << " (version " << mlog->version << ")" << dendl; | |
580 | ||
581 | if (!mlog->entries.empty()) { | |
582 | s->session->con->send_message(mlog); | |
583 | } else { | |
584 | mlog->put(); | |
585 | } | |
586 | if (s->onetime) | |
587 | mon->session_map.remove_sub(s); | |
588 | else | |
589 | s->next = summary_version+1; | |
590 | } | |
591 | ||
592 | /** | |
593 | * Create a log message containing only the last message in the summary. | |
594 | * | |
595 | * @param mlog Log message we'll send to the client. | |
596 | * @param level Maximum log level the client is interested in. | |
597 | * @return 'true' if we consider we successfully populated @mlog; | |
598 | * 'false' otherwise. | |
599 | */ | |
600 | bool LogMonitor::_create_sub_summary(MLog *mlog, int level) | |
601 | { | |
602 | dout(10) << __func__ << dendl; | |
603 | ||
604 | assert(mlog != NULL); | |
605 | ||
606 | if (!summary.tail.size()) | |
607 | return false; | |
608 | ||
609 | list<LogEntry>::reverse_iterator it = summary.tail.rbegin(); | |
610 | for (; it != summary.tail.rend(); ++it) { | |
611 | LogEntry e = *it; | |
612 | if (e.prio < level) | |
613 | continue; | |
614 | ||
615 | mlog->entries.push_back(e); | |
616 | mlog->version = summary.version; | |
617 | break; | |
618 | } | |
619 | ||
620 | return true; | |
621 | } | |
622 | ||
623 | /** | |
624 | * Create an incremental log message from version \p sv to \p summary.version | |
625 | * | |
626 | * @param mlog Log message we'll send to the client with the messages received | |
627 | * since version \p sv, inclusive. | |
628 | * @param level The max log level of the messages the client is interested in. | |
629 | * @param sv The version the client is looking for. | |
630 | */ | |
631 | void LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv) | |
632 | { | |
633 | dout(10) << __func__ << " level " << level << " ver " << sv | |
634 | << " cur summary ver " << summary.version << dendl; | |
635 | ||
636 | if (sv < get_first_committed()) { | |
637 | dout(10) << __func__ << " skipped from " << sv | |
638 | << " to first_committed " << get_first_committed() << dendl; | |
639 | LogEntry le; | |
640 | le.stamp = ceph_clock_now(); | |
641 | le.prio = CLOG_WARN; | |
642 | ostringstream ss; | |
643 | ss << "skipped log messages from " << sv << " to " << get_first_committed(); | |
644 | le.msg = ss.str(); | |
645 | mlog->entries.push_back(le); | |
646 | sv = get_first_committed(); | |
647 | } | |
648 | ||
649 | version_t summary_ver = summary.version; | |
28e407b8 | 650 | while (sv && sv <= summary_ver) { |
7c673cae FG |
651 | bufferlist bl; |
652 | int err = get_version(sv, bl); | |
653 | assert(err == 0); | |
654 | assert(bl.length()); | |
655 | bufferlist::iterator p = bl.begin(); | |
656 | __u8 v; | |
657 | ::decode(v,p); | |
658 | while (!p.end()) { | |
659 | LogEntry le; | |
660 | le.decode(p); | |
661 | ||
662 | if (le.prio < level) { | |
663 | dout(20) << __func__ << " requested " << level | |
664 | << " entry " << le.prio << dendl; | |
665 | continue; | |
666 | } | |
667 | ||
668 | mlog->entries.push_back(le); | |
669 | } | |
670 | mlog->version = sv++; | |
671 | } | |
672 | ||
673 | dout(10) << __func__ << " incremental message ready (" | |
674 | << mlog->entries.size() << " entries)" << dendl; | |
675 | } | |
676 | ||
677 | void LogMonitor::update_log_channels() | |
678 | { | |
679 | ostringstream oss; | |
680 | ||
681 | channels.clear(); | |
682 | ||
683 | int r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog, | |
684 | oss, &channels.log_to_syslog, | |
685 | CLOG_CONFIG_DEFAULT_KEY); | |
686 | if (r < 0) { | |
687 | derr << __func__ << " error parsing 'mon_cluster_log_to_syslog'" << dendl; | |
688 | return; | |
689 | } | |
690 | ||
691 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_level, | |
692 | oss, &channels.syslog_level, | |
693 | CLOG_CONFIG_DEFAULT_KEY); | |
694 | if (r < 0) { | |
695 | derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_level'" | |
696 | << dendl; | |
697 | return; | |
698 | } | |
699 | ||
700 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_facility, | |
701 | oss, &channels.syslog_facility, | |
702 | CLOG_CONFIG_DEFAULT_KEY); | |
703 | if (r < 0) { | |
704 | derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_facility'" | |
705 | << dendl; | |
706 | return; | |
707 | } | |
708 | ||
709 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_file, oss, | |
710 | &channels.log_file, | |
711 | CLOG_CONFIG_DEFAULT_KEY); | |
712 | if (r < 0) { | |
713 | derr << __func__ << " error parsing 'mon_cluster_log_file'" << dendl; | |
714 | return; | |
715 | } | |
716 | ||
717 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_file_level, oss, | |
718 | &channels.log_file_level, | |
719 | CLOG_CONFIG_DEFAULT_KEY); | |
720 | if (r < 0) { | |
721 | derr << __func__ << " error parsing 'mon_cluster_log_file_level'" | |
722 | << dendl; | |
723 | return; | |
724 | } | |
725 | ||
726 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog, oss, | |
727 | &channels.log_to_graylog, | |
728 | CLOG_CONFIG_DEFAULT_KEY); | |
729 | if (r < 0) { | |
730 | derr << __func__ << " error parsing 'mon_cluster_log_to_graylog'" | |
731 | << dendl; | |
732 | return; | |
733 | } | |
734 | ||
735 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_host, oss, | |
736 | &channels.log_to_graylog_host, | |
737 | CLOG_CONFIG_DEFAULT_KEY); | |
738 | if (r < 0) { | |
739 | derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_host'" | |
740 | << dendl; | |
741 | return; | |
742 | } | |
743 | ||
744 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_port, oss, | |
745 | &channels.log_to_graylog_port, | |
746 | CLOG_CONFIG_DEFAULT_KEY); | |
747 | if (r < 0) { | |
748 | derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_port'" | |
749 | << dendl; | |
750 | return; | |
751 | } | |
752 | ||
753 | channels.expand_channel_meta(); | |
754 | } | |
755 | ||
756 | void LogMonitor::log_channel_info::expand_channel_meta(map<string,string> &m) | |
757 | { | |
758 | generic_dout(20) << __func__ << " expand map: " << m << dendl; | |
759 | for (map<string,string>::iterator p = m.begin(); p != m.end(); ++p) { | |
760 | m[p->first] = expand_channel_meta(p->second, p->first); | |
761 | } | |
762 | generic_dout(20) << __func__ << " expanded map: " << m << dendl; | |
763 | } | |
764 | ||
765 | string LogMonitor::log_channel_info::expand_channel_meta( | |
766 | const string &input, | |
767 | const string &change_to) | |
768 | { | |
769 | size_t pos = string::npos; | |
770 | string s(input); | |
771 | while ((pos = s.find(LOG_META_CHANNEL)) != string::npos) { | |
772 | string tmp = s.substr(0, pos) + change_to; | |
773 | if (pos+LOG_META_CHANNEL.length() < s.length()) | |
774 | tmp += s.substr(pos+LOG_META_CHANNEL.length()); | |
775 | s = tmp; | |
776 | } | |
777 | generic_dout(20) << __func__ << " from '" << input | |
778 | << "' to '" << s << "'" << dendl; | |
779 | ||
780 | return s; | |
781 | } | |
782 | ||
783 | bool LogMonitor::log_channel_info::do_log_to_syslog(const string &channel) { | |
784 | string v = get_str_map_key(log_to_syslog, channel, | |
785 | &CLOG_CONFIG_DEFAULT_KEY); | |
786 | // We expect booleans, but they are in k/v pairs, kept | |
787 | // as strings, in 'log_to_syslog'. We must ensure | |
788 | // compatibility with existing boolean handling, and so | |
789 | // we are here using a modified version of how | |
790 | // md_config_t::set_val_raw() handles booleans. We will | |
791 | // accept both 'true' and 'false', but will also check for | |
792 | // '1' and '0'. The main distiction between this and the | |
793 | // original code is that we will assume everything not '1', | |
794 | // '0', 'true' or 'false' to be 'false'. | |
795 | bool ret = false; | |
796 | ||
797 | if (boost::iequals(v, "false")) { | |
798 | ret = false; | |
799 | } else if (boost::iequals(v, "true")) { | |
800 | ret = true; | |
801 | } else { | |
802 | std::string err; | |
803 | int b = strict_strtol(v.c_str(), 10, &err); | |
804 | ret = (err.empty() && b == 1); | |
805 | } | |
806 | ||
807 | return ret; | |
808 | } | |
809 | ||
810 | ceph::logging::Graylog::Ref LogMonitor::log_channel_info::get_graylog( | |
811 | const string &channel) | |
812 | { | |
813 | generic_dout(25) << __func__ << " for channel '" | |
814 | << channel << "'" << dendl; | |
815 | ||
816 | if (graylogs.count(channel) == 0) { | |
817 | auto graylog(std::make_shared<ceph::logging::Graylog>("mon")); | |
818 | ||
3efd9988 | 819 | graylog->set_fsid(g_conf->get_val<uuid_d>("fsid")); |
7c673cae FG |
820 | graylog->set_hostname(g_conf->host); |
821 | graylog->set_destination(get_str_map_key(log_to_graylog_host, channel, | |
822 | &CLOG_CONFIG_DEFAULT_KEY), | |
823 | atoi(get_str_map_key(log_to_graylog_port, channel, | |
824 | &CLOG_CONFIG_DEFAULT_KEY).c_str())); | |
825 | ||
826 | graylogs[channel] = graylog; | |
827 | generic_dout(20) << __func__ << " for channel '" | |
828 | << channel << "' to graylog host '" | |
829 | << log_to_graylog_host[channel] << ":" | |
830 | << log_to_graylog_port[channel] | |
831 | << "'" << dendl; | |
832 | } | |
833 | return graylogs[channel]; | |
834 | } | |
835 | ||
836 | void LogMonitor::handle_conf_change(const struct md_config_t *conf, | |
837 | const std::set<std::string> &changed) | |
838 | { | |
839 | if (changed.count("mon_cluster_log_to_syslog") || | |
840 | changed.count("mon_cluster_log_to_syslog_level") || | |
841 | changed.count("mon_cluster_log_to_syslog_facility") || | |
842 | changed.count("mon_cluster_log_file") || | |
843 | changed.count("mon_cluster_log_file_level") || | |
844 | changed.count("mon_cluster_log_to_graylog") || | |
845 | changed.count("mon_cluster_log_to_graylog_host") || | |
846 | changed.count("mon_cluster_log_to_graylog_port")) { | |
847 | update_log_channels(); | |
848 | } | |
849 | } |