]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include <boost/algorithm/string/predicate.hpp> | |
16 | ||
17 | #include <sstream> | |
18 | #include <syslog.h> | |
19 | ||
20 | #include "LogMonitor.h" | |
21 | #include "Monitor.h" | |
22 | #include "MonitorDBStore.h" | |
23 | ||
24 | #include "messages/MMonCommand.h" | |
25 | #include "messages/MLog.h" | |
26 | #include "messages/MLogAck.h" | |
27 | #include "common/Graylog.h" | |
28 | #include "common/errno.h" | |
29 | #include "common/strtol.h" | |
30 | #include "include/assert.h" | |
31 | #include "include/str_list.h" | |
32 | #include "include/str_map.h" | |
33 | #include "include/compat.h" | |
34 | ||
35 | #define dout_subsys ceph_subsys_mon | |
36 | #undef dout_prefix | |
37 | #define dout_prefix _prefix(_dout, mon, get_last_committed()) | |
38 | static ostream& _prefix(std::ostream *_dout, Monitor *mon, version_t v) { | |
39 | return *_dout << "mon." << mon->name << "@" << mon->rank | |
40 | << "(" << mon->get_state_name() | |
41 | << ").log v" << v << " "; | |
42 | } | |
43 | ||
44 | ostream& operator<<(ostream &out, const LogMonitor &pm) | |
45 | { | |
46 | return out << "log"; | |
47 | } | |
48 | ||
49 | /* | |
50 | Tick function to update the map based on performance every N seconds | |
51 | */ | |
52 | ||
53 | void LogMonitor::tick() | |
54 | { | |
55 | if (!is_active()) return; | |
56 | ||
57 | dout(10) << *this << dendl; | |
58 | ||
59 | } | |
60 | ||
61 | void LogMonitor::create_initial() | |
62 | { | |
63 | dout(10) << "create_initial -- creating initial map" << dendl; | |
64 | LogEntry e; | |
65 | memset(&e.who, 0, sizeof(e.who)); | |
31f18b77 | 66 | e.name = g_conf->name; |
7c673cae FG |
67 | e.stamp = ceph_clock_now(); |
68 | e.prio = CLOG_INFO; | |
69 | std::stringstream ss; | |
70 | ss << "mkfs " << mon->monmap->get_fsid(); | |
71 | e.msg = ss.str(); | |
72 | e.seq = 0; | |
73 | pending_log.insert(pair<utime_t,LogEntry>(e.stamp, e)); | |
74 | } | |
75 | ||
76 | void LogMonitor::update_from_paxos(bool *need_bootstrap) | |
77 | { | |
78 | dout(10) << __func__ << dendl; | |
79 | version_t version = get_last_committed(); | |
80 | dout(10) << __func__ << " version " << version | |
81 | << " summary v " << summary.version << dendl; | |
82 | if (version == summary.version) | |
83 | return; | |
84 | assert(version >= summary.version); | |
85 | ||
86 | map<string,bufferlist> channel_blog; | |
87 | ||
88 | version_t latest_full = get_version_latest_full(); | |
89 | dout(10) << __func__ << " latest full " << latest_full << dendl; | |
90 | if ((latest_full > 0) && (latest_full > summary.version)) { | |
91 | bufferlist latest_bl; | |
92 | get_version_full(latest_full, latest_bl); | |
93 | assert(latest_bl.length() != 0); | |
94 | dout(7) << __func__ << " loading summary e" << latest_full << dendl; | |
95 | bufferlist::iterator p = latest_bl.begin(); | |
96 | ::decode(summary, p); | |
97 | dout(7) << __func__ << " loaded summary e" << summary.version << dendl; | |
98 | } | |
99 | ||
100 | // walk through incrementals | |
101 | while (version > summary.version) { | |
102 | bufferlist bl; | |
103 | int err = get_version(summary.version+1, bl); | |
104 | assert(err == 0); | |
105 | assert(bl.length()); | |
106 | ||
107 | bufferlist::iterator p = bl.begin(); | |
108 | __u8 v; | |
109 | ::decode(v, p); | |
110 | while (!p.end()) { | |
111 | LogEntry le; | |
112 | le.decode(p); | |
113 | dout(7) << "update_from_paxos applying incremental log " << summary.version+1 << " " << le << dendl; | |
114 | ||
115 | string channel = le.channel; | |
116 | if (channel.empty()) // keep retrocompatibility | |
117 | channel = CLOG_CHANNEL_CLUSTER; | |
118 | ||
119 | if (channels.do_log_to_syslog(channel)) { | |
120 | string level = channels.get_level(channel); | |
121 | string facility = channels.get_facility(channel); | |
122 | if (level.empty() || facility.empty()) { | |
123 | derr << __func__ << " unable to log to syslog -- level or facility" | |
124 | << " not defined (level: " << level << ", facility: " | |
125 | << facility << ")" << dendl; | |
126 | continue; | |
127 | } | |
128 | le.log_to_syslog(channels.get_level(channel), | |
129 | channels.get_facility(channel)); | |
130 | } | |
131 | ||
132 | if (channels.do_log_to_graylog(channel)) { | |
133 | ceph::logging::Graylog::Ref graylog = channels.get_graylog(channel); | |
134 | if (graylog) { | |
135 | graylog->log_log_entry(&le); | |
136 | } | |
137 | dout(7) << "graylog: " << channel << " " << graylog | |
138 | << " host:" << channels.log_to_graylog_host << dendl; | |
139 | } | |
140 | ||
141 | string log_file = channels.get_log_file(channel); | |
142 | dout(20) << __func__ << " logging for channel '" << channel | |
143 | << "' to file '" << log_file << "'" << dendl; | |
144 | ||
145 | if (!log_file.empty()) { | |
146 | string log_file_level = channels.get_log_file_level(channel); | |
147 | if (log_file_level.empty()) { | |
148 | dout(1) << __func__ << " warning: log file level not defined for" | |
149 | << " channel '" << channel << "' yet a log file is --" | |
150 | << " will assume lowest level possible" << dendl; | |
151 | } | |
152 | ||
153 | int min = string_to_syslog_level(log_file_level); | |
154 | int l = clog_type_to_syslog_level(le.prio); | |
155 | if (l <= min) { | |
156 | stringstream ss; | |
157 | ss << le << "\n"; | |
158 | // init entry if DNE | |
159 | bufferlist &blog = channel_blog[channel]; | |
160 | blog.append(ss.str()); | |
161 | } | |
162 | } | |
163 | ||
164 | summary.add(le); | |
165 | } | |
166 | ||
167 | summary.version++; | |
31f18b77 | 168 | summary.prune(g_conf->mon_log_max_summary); |
7c673cae FG |
169 | } |
170 | ||
171 | dout(15) << __func__ << " logging for " | |
172 | << channel_blog.size() << " channels" << dendl; | |
173 | for(map<string,bufferlist>::iterator p = channel_blog.begin(); | |
174 | p != channel_blog.end(); ++p) { | |
175 | if (!p->second.length()) { | |
176 | dout(15) << __func__ << " channel '" << p->first | |
177 | << "': nothing to log" << dendl; | |
178 | continue; | |
179 | } | |
180 | ||
181 | dout(15) << __func__ << " channel '" << p->first | |
182 | << "' logging " << p->second.length() << " bytes" << dendl; | |
183 | string log_file = channels.get_log_file(p->first); | |
184 | ||
185 | int fd = ::open(log_file.c_str(), O_WRONLY|O_APPEND|O_CREAT, 0600); | |
186 | if (fd < 0) { | |
187 | int err = -errno; | |
188 | dout(1) << "unable to write to '" << log_file << "' for channel '" | |
189 | << p->first << "': " << cpp_strerror(err) << dendl; | |
190 | } else { | |
191 | int err = p->second.write_fd(fd); | |
192 | if (err < 0) { | |
193 | dout(1) << "error writing to '" << log_file << "' for channel '" | |
194 | << p->first << ": " << cpp_strerror(err) << dendl; | |
195 | } | |
196 | VOID_TEMP_FAILURE_RETRY(::close(fd)); | |
197 | } | |
198 | } | |
199 | ||
200 | check_subs(); | |
201 | } | |
202 | ||
203 | void LogMonitor::create_pending() | |
204 | { | |
205 | pending_log.clear(); | |
206 | pending_summary = summary; | |
207 | dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl; | |
208 | } | |
209 | ||
210 | void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t) | |
211 | { | |
212 | version_t version = get_last_committed() + 1; | |
213 | bufferlist bl; | |
214 | dout(10) << __func__ << " v" << version << dendl; | |
215 | __u8 v = 1; | |
216 | ::encode(v, bl); | |
217 | multimap<utime_t,LogEntry>::iterator p; | |
218 | for (p = pending_log.begin(); p != pending_log.end(); ++p) | |
219 | p->second.encode(bl, mon->get_quorum_con_features()); | |
220 | ||
221 | put_version(t, version, bl); | |
222 | put_last_committed(t, version); | |
223 | } | |
224 | ||
225 | void LogMonitor::encode_full(MonitorDBStore::TransactionRef t) | |
226 | { | |
227 | dout(10) << __func__ << " log v " << summary.version << dendl; | |
228 | assert(get_last_committed() == summary.version); | |
229 | ||
230 | bufferlist summary_bl; | |
231 | ::encode(summary, summary_bl, mon->get_quorum_con_features()); | |
232 | ||
233 | put_version_full(t, summary.version, summary_bl); | |
234 | put_version_latest_full(t, summary.version); | |
235 | } | |
236 | ||
237 | version_t LogMonitor::get_trim_to() | |
238 | { | |
239 | if (!mon->is_leader()) | |
240 | return 0; | |
241 | ||
242 | unsigned max = g_conf->mon_max_log_epochs; | |
243 | version_t version = get_last_committed(); | |
244 | if (version > max) | |
245 | return version - max; | |
246 | return 0; | |
247 | } | |
248 | ||
249 | bool LogMonitor::preprocess_query(MonOpRequestRef op) | |
250 | { | |
251 | op->mark_logmon_event("preprocess_query"); | |
252 | PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req()); | |
253 | dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl; | |
254 | switch (m->get_type()) { | |
255 | case MSG_MON_COMMAND: | |
256 | return preprocess_command(op); | |
257 | ||
258 | case MSG_LOG: | |
259 | return preprocess_log(op); | |
260 | ||
261 | default: | |
262 | ceph_abort(); | |
263 | return true; | |
264 | } | |
265 | } | |
266 | ||
267 | bool LogMonitor::prepare_update(MonOpRequestRef op) | |
268 | { | |
269 | op->mark_logmon_event("prepare_update"); | |
270 | PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req()); | |
271 | dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl; | |
272 | switch (m->get_type()) { | |
273 | case MSG_MON_COMMAND: | |
274 | return prepare_command(op); | |
275 | case MSG_LOG: | |
276 | return prepare_log(op); | |
277 | default: | |
278 | ceph_abort(); | |
279 | return false; | |
280 | } | |
281 | } | |
282 | ||
283 | bool LogMonitor::preprocess_log(MonOpRequestRef op) | |
284 | { | |
285 | op->mark_logmon_event("preprocess_log"); | |
286 | MLog *m = static_cast<MLog*>(op->get_req()); | |
287 | dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl; | |
288 | int num_new = 0; | |
289 | ||
290 | MonSession *session = m->get_session(); | |
291 | if (!session) | |
292 | goto done; | |
293 | if (!session->is_capable("log", MON_CAP_W)) { | |
294 | dout(0) << "preprocess_log got MLog from entity with insufficient privileges " | |
295 | << session->caps << dendl; | |
296 | goto done; | |
297 | } | |
298 | ||
299 | for (deque<LogEntry>::iterator p = m->entries.begin(); | |
300 | p != m->entries.end(); | |
301 | ++p) { | |
302 | if (!pending_summary.contains(p->key())) | |
303 | num_new++; | |
304 | } | |
305 | if (!num_new) { | |
306 | dout(10) << " nothing new" << dendl; | |
307 | goto done; | |
308 | } | |
309 | ||
310 | return false; | |
311 | ||
312 | done: | |
313 | return true; | |
314 | } | |
315 | ||
316 | struct LogMonitor::C_Log : public C_MonOp { | |
317 | LogMonitor *logmon; | |
318 | C_Log(LogMonitor *p, MonOpRequestRef o) : | |
319 | C_MonOp(o), logmon(p) {} | |
320 | void _finish(int r) override { | |
321 | if (r == -ECANCELED) { | |
322 | return; | |
323 | } | |
324 | logmon->_updated_log(op); | |
325 | } | |
326 | }; | |
327 | ||
328 | bool LogMonitor::prepare_log(MonOpRequestRef op) | |
329 | { | |
330 | op->mark_logmon_event("prepare_log"); | |
331 | MLog *m = static_cast<MLog*>(op->get_req()); | |
332 | dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl; | |
333 | ||
334 | if (m->fsid != mon->monmap->fsid) { | |
335 | dout(0) << "handle_log on fsid " << m->fsid << " != " << mon->monmap->fsid | |
336 | << dendl; | |
337 | return false; | |
338 | } | |
339 | ||
340 | for (deque<LogEntry>::iterator p = m->entries.begin(); | |
341 | p != m->entries.end(); | |
342 | ++p) { | |
343 | dout(10) << " logging " << *p << dendl; | |
344 | if (!pending_summary.contains(p->key())) { | |
345 | pending_summary.add(*p); | |
346 | pending_log.insert(pair<utime_t,LogEntry>(p->stamp, *p)); | |
347 | } | |
348 | } | |
31f18b77 | 349 | pending_summary.prune(g_conf->mon_log_max_summary); |
7c673cae FG |
350 | wait_for_finished_proposal(op, new C_Log(this, op)); |
351 | return true; | |
352 | } | |
353 | ||
354 | void LogMonitor::_updated_log(MonOpRequestRef op) | |
355 | { | |
356 | MLog *m = static_cast<MLog*>(op->get_req()); | |
357 | dout(7) << "_updated_log for " << m->get_orig_source_inst() << dendl; | |
358 | mon->send_reply(op, new MLogAck(m->fsid, m->entries.rbegin()->seq)); | |
359 | } | |
360 | ||
361 | bool LogMonitor::should_propose(double& delay) | |
362 | { | |
363 | // commit now if we have a lot of pending events | |
364 | if (g_conf->mon_max_log_entries_per_event > 0 && | |
365 | pending_log.size() >= (unsigned)g_conf->mon_max_log_entries_per_event) | |
366 | return true; | |
367 | ||
368 | // otherwise fall back to generic policy | |
369 | return PaxosService::should_propose(delay); | |
370 | } | |
371 | ||
372 | ||
373 | bool LogMonitor::preprocess_command(MonOpRequestRef op) | |
374 | { | |
375 | op->mark_logmon_event("preprocess_command"); | |
31f18b77 FG |
376 | MMonCommand *m = static_cast<MMonCommand*>(op->get_req()); |
377 | int r = -EINVAL; | |
7c673cae FG |
378 | bufferlist rdata; |
379 | stringstream ss; | |
380 | ||
31f18b77 FG |
381 | map<string, cmd_vartype> cmdmap; |
382 | if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) { | |
383 | string rs = ss.str(); | |
384 | mon->reply_command(op, -EINVAL, rs, get_last_committed()); | |
385 | return true; | |
386 | } | |
387 | MonSession *session = m->get_session(); | |
388 | if (!session) { | |
389 | mon->reply_command(op, -EACCES, "access denied", get_last_committed()); | |
7c673cae | 390 | return true; |
31f18b77 FG |
391 | } |
392 | ||
393 | string prefix; | |
394 | cmd_getval(g_ceph_context, cmdmap, "prefix", prefix); | |
395 | ||
396 | string format; | |
397 | cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain")); | |
398 | boost::scoped_ptr<Formatter> f(Formatter::create(format)); | |
399 | ||
400 | if (prefix == "log last") { | |
401 | int64_t num = 20; | |
402 | cmd_getval(g_ceph_context, cmdmap, "num", num); | |
403 | if (f) { | |
404 | f->open_array_section("tail"); | |
405 | } | |
224ce89b WB |
406 | |
407 | std::string level_str; | |
408 | clog_type level; | |
409 | if (cmd_getval(g_ceph_context, cmdmap, "level", level_str)) { | |
410 | level = LogEntry::str_to_level(level_str); | |
411 | if (level == CLOG_UNKNOWN) { | |
412 | ss << "Invalid severity '" << level_str << "'"; | |
413 | mon->reply_command(op, -EINVAL, ss.str(), get_last_committed()); | |
414 | return true; | |
415 | } | |
416 | } else { | |
417 | level = CLOG_INFO; | |
418 | } | |
419 | ||
420 | std::string channel; | |
421 | if (!cmd_getval(g_ceph_context, cmdmap, "channel", channel)) { | |
422 | channel = CLOG_CHANNEL_DEFAULT; | |
423 | } | |
424 | ||
425 | // We'll apply this twice, once while counting out lines | |
426 | // and once while outputting them. | |
427 | auto match = [level, channel](const LogEntry &entry) { | |
428 | return entry.prio >= level && (entry.channel == channel || channel == "*"); | |
429 | }; | |
430 | ||
c07f9fc5 FG |
431 | auto rp = summary.tail.rbegin(); |
432 | while (num > 0 && rp != summary.tail.rend()) { | |
433 | if (match(*rp)) { | |
224ce89b WB |
434 | num--; |
435 | } | |
c07f9fc5 | 436 | ++rp; |
31f18b77 FG |
437 | } |
438 | ostringstream ss; | |
c07f9fc5 | 439 | auto p = summary.tail.begin(); |
31f18b77 | 440 | for ( ; p != summary.tail.end(); ++p) { |
224ce89b WB |
441 | if (!match(*p)) { |
442 | continue; | |
443 | } | |
444 | ||
31f18b77 FG |
445 | if (f) { |
446 | f->dump_object("entry", *p); | |
447 | } else { | |
448 | ss << *p << "\n"; | |
449 | } | |
450 | } | |
451 | if (f) { | |
452 | f->close_section(); | |
453 | f->flush(rdata); | |
454 | } else { | |
455 | rdata.append(ss.str()); | |
456 | } | |
457 | r = 0; | |
458 | } else { | |
7c673cae | 459 | return false; |
31f18b77 FG |
460 | } |
461 | ||
462 | string rs; | |
463 | getline(ss, rs); | |
464 | mon->reply_command(op, r, rs, rdata, get_last_committed()); | |
465 | return true; | |
7c673cae FG |
466 | } |
467 | ||
468 | ||
469 | bool LogMonitor::prepare_command(MonOpRequestRef op) | |
470 | { | |
471 | op->mark_logmon_event("prepare_command"); | |
472 | MMonCommand *m = static_cast<MMonCommand*>(op->get_req()); | |
473 | stringstream ss; | |
474 | string rs; | |
475 | int err = -EINVAL; | |
476 | ||
477 | map<string, cmd_vartype> cmdmap; | |
478 | if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) { | |
479 | // ss has reason for failure | |
480 | string rs = ss.str(); | |
481 | mon->reply_command(op, -EINVAL, rs, get_last_committed()); | |
482 | return true; | |
483 | } | |
484 | ||
485 | string prefix; | |
486 | cmd_getval(g_ceph_context, cmdmap, "prefix", prefix); | |
487 | ||
488 | MonSession *session = m->get_session(); | |
489 | if (!session) { | |
490 | mon->reply_command(op, -EACCES, "access denied", get_last_committed()); | |
491 | return true; | |
492 | } | |
493 | ||
494 | if (prefix == "log") { | |
495 | vector<string> logtext; | |
496 | cmd_getval(g_ceph_context, cmdmap, "logtext", logtext); | |
497 | LogEntry le; | |
498 | le.who = m->get_orig_source_inst(); | |
31f18b77 | 499 | le.name = session->entity_name; |
7c673cae FG |
500 | le.stamp = m->get_recv_stamp(); |
501 | le.seq = 0; | |
502 | le.prio = CLOG_INFO; | |
224ce89b | 503 | le.channel = CLOG_CHANNEL_DEFAULT; |
7c673cae FG |
504 | le.msg = str_join(logtext, " "); |
505 | pending_summary.add(le); | |
31f18b77 | 506 | pending_summary.prune(g_conf->mon_log_max_summary); |
7c673cae FG |
507 | pending_log.insert(pair<utime_t,LogEntry>(le.stamp, le)); |
508 | wait_for_finished_proposal(op, new Monitor::C_Command( | |
509 | mon, op, 0, string(), get_last_committed() + 1)); | |
510 | return true; | |
511 | } | |
512 | ||
513 | getline(ss, rs); | |
514 | mon->reply_command(op, err, rs, get_last_committed()); | |
515 | return false; | |
516 | } | |
517 | ||
518 | ||
519 | int LogMonitor::sub_name_to_id(const string& n) | |
520 | { | |
224ce89b WB |
521 | if (n.substr(0, 4) == "log-" && n.size() > 4) { |
522 | return LogEntry::str_to_level(n.substr(4)); | |
523 | } else { | |
524 | return CLOG_UNKNOWN; | |
525 | } | |
7c673cae FG |
526 | } |
527 | ||
528 | void LogMonitor::check_subs() | |
529 | { | |
530 | dout(10) << __func__ << dendl; | |
531 | for (map<string, xlist<Subscription*>*>::iterator i = mon->session_map.subs.begin(); | |
532 | i != mon->session_map.subs.end(); | |
533 | ++i) { | |
534 | for (xlist<Subscription*>::iterator j = i->second->begin(); !j.end(); ++j) { | |
535 | if (sub_name_to_id((*j)->type) >= 0) | |
536 | check_sub(*j); | |
537 | } | |
538 | } | |
539 | } | |
540 | ||
541 | void LogMonitor::check_sub(Subscription *s) | |
542 | { | |
543 | dout(10) << __func__ << " client wants " << s->type << " ver " << s->next << dendl; | |
544 | ||
545 | int sub_level = sub_name_to_id(s->type); | |
546 | assert(sub_level >= 0); | |
547 | ||
548 | version_t summary_version = summary.version; | |
549 | if (s->next > summary_version) { | |
550 | dout(10) << __func__ << " client " << s->session->inst | |
551 | << " requested version (" << s->next << ") is greater than ours (" | |
552 | << summary_version << "), which means we already sent him" | |
553 | << " everything we have." << dendl; | |
554 | return; | |
555 | } | |
556 | ||
557 | MLog *mlog = new MLog(mon->monmap->fsid); | |
558 | ||
559 | if (s->next == 0) { | |
560 | /* First timer, heh? */ | |
561 | bool ret = _create_sub_summary(mlog, sub_level); | |
562 | if (!ret) { | |
563 | dout(1) << __func__ << " ret = " << ret << dendl; | |
564 | mlog->put(); | |
565 | return; | |
566 | } | |
567 | } else { | |
568 | /* let us send you an incremental log... */ | |
569 | _create_sub_incremental(mlog, sub_level, s->next); | |
570 | } | |
571 | ||
572 | dout(1) << __func__ << " sending message to " << s->session->inst | |
573 | << " with " << mlog->entries.size() << " entries" | |
574 | << " (version " << mlog->version << ")" << dendl; | |
575 | ||
576 | if (!mlog->entries.empty()) { | |
577 | s->session->con->send_message(mlog); | |
578 | } else { | |
579 | mlog->put(); | |
580 | } | |
581 | if (s->onetime) | |
582 | mon->session_map.remove_sub(s); | |
583 | else | |
584 | s->next = summary_version+1; | |
585 | } | |
586 | ||
587 | /** | |
588 | * Create a log message containing only the last message in the summary. | |
589 | * | |
590 | * @param mlog Log message we'll send to the client. | |
591 | * @param level Maximum log level the client is interested in. | |
592 | * @return 'true' if we consider we successfully populated @mlog; | |
593 | * 'false' otherwise. | |
594 | */ | |
595 | bool LogMonitor::_create_sub_summary(MLog *mlog, int level) | |
596 | { | |
597 | dout(10) << __func__ << dendl; | |
598 | ||
599 | assert(mlog != NULL); | |
600 | ||
601 | if (!summary.tail.size()) | |
602 | return false; | |
603 | ||
604 | list<LogEntry>::reverse_iterator it = summary.tail.rbegin(); | |
605 | for (; it != summary.tail.rend(); ++it) { | |
606 | LogEntry e = *it; | |
607 | if (e.prio < level) | |
608 | continue; | |
609 | ||
610 | mlog->entries.push_back(e); | |
611 | mlog->version = summary.version; | |
612 | break; | |
613 | } | |
614 | ||
615 | return true; | |
616 | } | |
617 | ||
618 | /** | |
619 | * Create an incremental log message from version \p sv to \p summary.version | |
620 | * | |
621 | * @param mlog Log message we'll send to the client with the messages received | |
622 | * since version \p sv, inclusive. | |
623 | * @param level The max log level of the messages the client is interested in. | |
624 | * @param sv The version the client is looking for. | |
625 | */ | |
626 | void LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv) | |
627 | { | |
628 | dout(10) << __func__ << " level " << level << " ver " << sv | |
629 | << " cur summary ver " << summary.version << dendl; | |
630 | ||
631 | if (sv < get_first_committed()) { | |
632 | dout(10) << __func__ << " skipped from " << sv | |
633 | << " to first_committed " << get_first_committed() << dendl; | |
634 | LogEntry le; | |
635 | le.stamp = ceph_clock_now(); | |
636 | le.prio = CLOG_WARN; | |
637 | ostringstream ss; | |
638 | ss << "skipped log messages from " << sv << " to " << get_first_committed(); | |
639 | le.msg = ss.str(); | |
640 | mlog->entries.push_back(le); | |
641 | sv = get_first_committed(); | |
642 | } | |
643 | ||
644 | version_t summary_ver = summary.version; | |
645 | while (sv <= summary_ver) { | |
646 | bufferlist bl; | |
647 | int err = get_version(sv, bl); | |
648 | assert(err == 0); | |
649 | assert(bl.length()); | |
650 | bufferlist::iterator p = bl.begin(); | |
651 | __u8 v; | |
652 | ::decode(v,p); | |
653 | while (!p.end()) { | |
654 | LogEntry le; | |
655 | le.decode(p); | |
656 | ||
657 | if (le.prio < level) { | |
658 | dout(20) << __func__ << " requested " << level | |
659 | << " entry " << le.prio << dendl; | |
660 | continue; | |
661 | } | |
662 | ||
663 | mlog->entries.push_back(le); | |
664 | } | |
665 | mlog->version = sv++; | |
666 | } | |
667 | ||
668 | dout(10) << __func__ << " incremental message ready (" | |
669 | << mlog->entries.size() << " entries)" << dendl; | |
670 | } | |
671 | ||
672 | void LogMonitor::update_log_channels() | |
673 | { | |
674 | ostringstream oss; | |
675 | ||
676 | channels.clear(); | |
677 | ||
678 | int r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog, | |
679 | oss, &channels.log_to_syslog, | |
680 | CLOG_CONFIG_DEFAULT_KEY); | |
681 | if (r < 0) { | |
682 | derr << __func__ << " error parsing 'mon_cluster_log_to_syslog'" << dendl; | |
683 | return; | |
684 | } | |
685 | ||
686 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_level, | |
687 | oss, &channels.syslog_level, | |
688 | CLOG_CONFIG_DEFAULT_KEY); | |
689 | if (r < 0) { | |
690 | derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_level'" | |
691 | << dendl; | |
692 | return; | |
693 | } | |
694 | ||
695 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_facility, | |
696 | oss, &channels.syslog_facility, | |
697 | CLOG_CONFIG_DEFAULT_KEY); | |
698 | if (r < 0) { | |
699 | derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_facility'" | |
700 | << dendl; | |
701 | return; | |
702 | } | |
703 | ||
704 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_file, oss, | |
705 | &channels.log_file, | |
706 | CLOG_CONFIG_DEFAULT_KEY); | |
707 | if (r < 0) { | |
708 | derr << __func__ << " error parsing 'mon_cluster_log_file'" << dendl; | |
709 | return; | |
710 | } | |
711 | ||
712 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_file_level, oss, | |
713 | &channels.log_file_level, | |
714 | CLOG_CONFIG_DEFAULT_KEY); | |
715 | if (r < 0) { | |
716 | derr << __func__ << " error parsing 'mon_cluster_log_file_level'" | |
717 | << dendl; | |
718 | return; | |
719 | } | |
720 | ||
721 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog, oss, | |
722 | &channels.log_to_graylog, | |
723 | CLOG_CONFIG_DEFAULT_KEY); | |
724 | if (r < 0) { | |
725 | derr << __func__ << " error parsing 'mon_cluster_log_to_graylog'" | |
726 | << dendl; | |
727 | return; | |
728 | } | |
729 | ||
730 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_host, oss, | |
731 | &channels.log_to_graylog_host, | |
732 | CLOG_CONFIG_DEFAULT_KEY); | |
733 | if (r < 0) { | |
734 | derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_host'" | |
735 | << dendl; | |
736 | return; | |
737 | } | |
738 | ||
739 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_port, oss, | |
740 | &channels.log_to_graylog_port, | |
741 | CLOG_CONFIG_DEFAULT_KEY); | |
742 | if (r < 0) { | |
743 | derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_port'" | |
744 | << dendl; | |
745 | return; | |
746 | } | |
747 | ||
748 | channels.expand_channel_meta(); | |
749 | } | |
750 | ||
751 | void LogMonitor::log_channel_info::expand_channel_meta(map<string,string> &m) | |
752 | { | |
753 | generic_dout(20) << __func__ << " expand map: " << m << dendl; | |
754 | for (map<string,string>::iterator p = m.begin(); p != m.end(); ++p) { | |
755 | m[p->first] = expand_channel_meta(p->second, p->first); | |
756 | } | |
757 | generic_dout(20) << __func__ << " expanded map: " << m << dendl; | |
758 | } | |
759 | ||
760 | string LogMonitor::log_channel_info::expand_channel_meta( | |
761 | const string &input, | |
762 | const string &change_to) | |
763 | { | |
764 | size_t pos = string::npos; | |
765 | string s(input); | |
766 | while ((pos = s.find(LOG_META_CHANNEL)) != string::npos) { | |
767 | string tmp = s.substr(0, pos) + change_to; | |
768 | if (pos+LOG_META_CHANNEL.length() < s.length()) | |
769 | tmp += s.substr(pos+LOG_META_CHANNEL.length()); | |
770 | s = tmp; | |
771 | } | |
772 | generic_dout(20) << __func__ << " from '" << input | |
773 | << "' to '" << s << "'" << dendl; | |
774 | ||
775 | return s; | |
776 | } | |
777 | ||
778 | bool LogMonitor::log_channel_info::do_log_to_syslog(const string &channel) { | |
779 | string v = get_str_map_key(log_to_syslog, channel, | |
780 | &CLOG_CONFIG_DEFAULT_KEY); | |
781 | // We expect booleans, but they are in k/v pairs, kept | |
782 | // as strings, in 'log_to_syslog'. We must ensure | |
783 | // compatibility with existing boolean handling, and so | |
784 | // we are here using a modified version of how | |
785 | // md_config_t::set_val_raw() handles booleans. We will | |
786 | // accept both 'true' and 'false', but will also check for | |
787 | // '1' and '0'. The main distiction between this and the | |
788 | // original code is that we will assume everything not '1', | |
789 | // '0', 'true' or 'false' to be 'false'. | |
790 | bool ret = false; | |
791 | ||
792 | if (boost::iequals(v, "false")) { | |
793 | ret = false; | |
794 | } else if (boost::iequals(v, "true")) { | |
795 | ret = true; | |
796 | } else { | |
797 | std::string err; | |
798 | int b = strict_strtol(v.c_str(), 10, &err); | |
799 | ret = (err.empty() && b == 1); | |
800 | } | |
801 | ||
802 | return ret; | |
803 | } | |
804 | ||
805 | ceph::logging::Graylog::Ref LogMonitor::log_channel_info::get_graylog( | |
806 | const string &channel) | |
807 | { | |
808 | generic_dout(25) << __func__ << " for channel '" | |
809 | << channel << "'" << dendl; | |
810 | ||
811 | if (graylogs.count(channel) == 0) { | |
812 | auto graylog(std::make_shared<ceph::logging::Graylog>("mon")); | |
813 | ||
814 | graylog->set_fsid(g_conf->fsid); | |
815 | graylog->set_hostname(g_conf->host); | |
816 | graylog->set_destination(get_str_map_key(log_to_graylog_host, channel, | |
817 | &CLOG_CONFIG_DEFAULT_KEY), | |
818 | atoi(get_str_map_key(log_to_graylog_port, channel, | |
819 | &CLOG_CONFIG_DEFAULT_KEY).c_str())); | |
820 | ||
821 | graylogs[channel] = graylog; | |
822 | generic_dout(20) << __func__ << " for channel '" | |
823 | << channel << "' to graylog host '" | |
824 | << log_to_graylog_host[channel] << ":" | |
825 | << log_to_graylog_port[channel] | |
826 | << "'" << dendl; | |
827 | } | |
828 | return graylogs[channel]; | |
829 | } | |
830 | ||
831 | void LogMonitor::handle_conf_change(const struct md_config_t *conf, | |
832 | const std::set<std::string> &changed) | |
833 | { | |
834 | if (changed.count("mon_cluster_log_to_syslog") || | |
835 | changed.count("mon_cluster_log_to_syslog_level") || | |
836 | changed.count("mon_cluster_log_to_syslog_facility") || | |
837 | changed.count("mon_cluster_log_file") || | |
838 | changed.count("mon_cluster_log_file_level") || | |
839 | changed.count("mon_cluster_log_to_graylog") || | |
840 | changed.count("mon_cluster_log_to_graylog_host") || | |
841 | changed.count("mon_cluster_log_to_graylog_port")) { | |
842 | update_log_channels(); | |
843 | } | |
844 | } |