]>
Commit | Line | Data |
---|---|---|
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- | |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include <boost/algorithm/string/predicate.hpp> | |
16 | ||
17 | #include <sstream> | |
18 | #include <syslog.h> | |
19 | ||
20 | #include "LogMonitor.h" | |
21 | #include "Monitor.h" | |
22 | #include "MonitorDBStore.h" | |
23 | ||
24 | #include "messages/MMonCommand.h" | |
25 | #include "messages/MLog.h" | |
26 | #include "messages/MLogAck.h" | |
27 | #include "common/Graylog.h" | |
28 | #include "common/errno.h" | |
29 | #include "common/strtol.h" | |
30 | #include "include/assert.h" | |
31 | #include "include/str_list.h" | |
32 | #include "include/str_map.h" | |
33 | #include "include/compat.h" | |
34 | ||
35 | #define dout_subsys ceph_subsys_mon | |
36 | #undef dout_prefix | |
37 | #define dout_prefix _prefix(_dout, mon, get_last_committed()) | |
38 | static ostream& _prefix(std::ostream *_dout, Monitor *mon, version_t v) { | |
39 | return *_dout << "mon." << mon->name << "@" << mon->rank | |
40 | << "(" << mon->get_state_name() | |
41 | << ").log v" << v << " "; | |
42 | } | |
43 | ||
44 | ostream& operator<<(ostream &out, const LogMonitor &pm) | |
45 | { | |
46 | return out << "log"; | |
47 | } | |
48 | ||
49 | /* | |
50 | Tick function to update the map based on performance every N seconds | |
51 | */ | |
52 | ||
53 | void LogMonitor::tick() | |
54 | { | |
55 | if (!is_active()) return; | |
56 | ||
57 | dout(10) << *this << dendl; | |
58 | ||
59 | } | |
60 | ||
61 | void LogMonitor::create_initial() | |
62 | { | |
63 | dout(10) << "create_initial -- creating initial map" << dendl; | |
64 | LogEntry e; | |
65 | memset(&e.who, 0, sizeof(e.who)); | |
66 | e.name = g_conf->name; | |
67 | e.stamp = ceph_clock_now(); | |
68 | e.prio = CLOG_INFO; | |
69 | std::stringstream ss; | |
70 | ss << "mkfs " << mon->monmap->get_fsid(); | |
71 | e.msg = ss.str(); | |
72 | e.seq = 0; | |
73 | pending_log.insert(pair<utime_t,LogEntry>(e.stamp, e)); | |
74 | } | |
75 | ||
76 | void LogMonitor::update_from_paxos(bool *need_bootstrap) | |
77 | { | |
78 | dout(10) << __func__ << dendl; | |
79 | version_t version = get_last_committed(); | |
80 | dout(10) << __func__ << " version " << version | |
81 | << " summary v " << summary.version << dendl; | |
82 | if (version == summary.version) | |
83 | return; | |
84 | assert(version >= summary.version); | |
85 | ||
86 | map<string,bufferlist> channel_blog; | |
87 | ||
88 | version_t latest_full = get_version_latest_full(); | |
89 | dout(10) << __func__ << " latest full " << latest_full << dendl; | |
90 | if ((latest_full > 0) && (latest_full > summary.version)) { | |
91 | bufferlist latest_bl; | |
92 | get_version_full(latest_full, latest_bl); | |
93 | assert(latest_bl.length() != 0); | |
94 | dout(7) << __func__ << " loading summary e" << latest_full << dendl; | |
95 | bufferlist::iterator p = latest_bl.begin(); | |
96 | ::decode(summary, p); | |
97 | dout(7) << __func__ << " loaded summary e" << summary.version << dendl; | |
98 | } | |
99 | ||
100 | // walk through incrementals | |
101 | while (version > summary.version) { | |
102 | bufferlist bl; | |
103 | int err = get_version(summary.version+1, bl); | |
104 | assert(err == 0); | |
105 | assert(bl.length()); | |
106 | ||
107 | bufferlist::iterator p = bl.begin(); | |
108 | __u8 v; | |
109 | ::decode(v, p); | |
110 | while (!p.end()) { | |
111 | LogEntry le; | |
112 | le.decode(p); | |
113 | dout(7) << "update_from_paxos applying incremental log " << summary.version+1 << " " << le << dendl; | |
114 | ||
115 | string channel = le.channel; | |
116 | if (channel.empty()) // keep retrocompatibility | |
117 | channel = CLOG_CHANNEL_CLUSTER; | |
118 | ||
119 | if (g_conf->get_val<bool>("mon_cluster_log_to_stderr")) { | |
120 | cerr << channel << " " << le << std::endl; | |
121 | } | |
122 | ||
123 | if (channels.do_log_to_syslog(channel)) { | |
124 | string level = channels.get_level(channel); | |
125 | string facility = channels.get_facility(channel); | |
126 | if (level.empty() || facility.empty()) { | |
127 | derr << __func__ << " unable to log to syslog -- level or facility" | |
128 | << " not defined (level: " << level << ", facility: " | |
129 | << facility << ")" << dendl; | |
130 | continue; | |
131 | } | |
132 | le.log_to_syslog(channels.get_level(channel), | |
133 | channels.get_facility(channel)); | |
134 | } | |
135 | ||
136 | if (channels.do_log_to_graylog(channel)) { | |
137 | ceph::logging::Graylog::Ref graylog = channels.get_graylog(channel); | |
138 | if (graylog) { | |
139 | graylog->log_log_entry(&le); | |
140 | } | |
141 | dout(7) << "graylog: " << channel << " " << graylog | |
142 | << " host:" << channels.log_to_graylog_host << dendl; | |
143 | } | |
144 | ||
145 | string log_file = channels.get_log_file(channel); | |
146 | dout(20) << __func__ << " logging for channel '" << channel | |
147 | << "' to file '" << log_file << "'" << dendl; | |
148 | ||
149 | if (!log_file.empty()) { | |
150 | string log_file_level = channels.get_log_file_level(channel); | |
151 | if (log_file_level.empty()) { | |
152 | dout(1) << __func__ << " warning: log file level not defined for" | |
153 | << " channel '" << channel << "' yet a log file is --" | |
154 | << " will assume lowest level possible" << dendl; | |
155 | } | |
156 | ||
157 | int min = string_to_syslog_level(log_file_level); | |
158 | int l = clog_type_to_syslog_level(le.prio); | |
159 | if (l <= min) { | |
160 | stringstream ss; | |
161 | ss << le << "\n"; | |
162 | // init entry if DNE | |
163 | bufferlist &blog = channel_blog[channel]; | |
164 | blog.append(ss.str()); | |
165 | } | |
166 | } | |
167 | ||
168 | summary.add(le); | |
169 | } | |
170 | ||
171 | summary.version++; | |
172 | summary.prune(g_conf->mon_log_max_summary); | |
173 | } | |
174 | ||
175 | dout(15) << __func__ << " logging for " | |
176 | << channel_blog.size() << " channels" << dendl; | |
177 | for(map<string,bufferlist>::iterator p = channel_blog.begin(); | |
178 | p != channel_blog.end(); ++p) { | |
179 | if (!p->second.length()) { | |
180 | dout(15) << __func__ << " channel '" << p->first | |
181 | << "': nothing to log" << dendl; | |
182 | continue; | |
183 | } | |
184 | ||
185 | dout(15) << __func__ << " channel '" << p->first | |
186 | << "' logging " << p->second.length() << " bytes" << dendl; | |
187 | string log_file = channels.get_log_file(p->first); | |
188 | ||
189 | int fd = ::open(log_file.c_str(), O_WRONLY|O_APPEND|O_CREAT, 0600); | |
190 | if (fd < 0) { | |
191 | int err = -errno; | |
192 | dout(1) << "unable to write to '" << log_file << "' for channel '" | |
193 | << p->first << "': " << cpp_strerror(err) << dendl; | |
194 | } else { | |
195 | int err = p->second.write_fd(fd); | |
196 | if (err < 0) { | |
197 | dout(1) << "error writing to '" << log_file << "' for channel '" | |
198 | << p->first << ": " << cpp_strerror(err) << dendl; | |
199 | } | |
200 | VOID_TEMP_FAILURE_RETRY(::close(fd)); | |
201 | } | |
202 | } | |
203 | ||
204 | check_subs(); | |
205 | } | |
206 | ||
207 | void LogMonitor::create_pending() | |
208 | { | |
209 | pending_log.clear(); | |
210 | pending_summary = summary; | |
211 | dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl; | |
212 | } | |
213 | ||
214 | void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t) | |
215 | { | |
216 | version_t version = get_last_committed() + 1; | |
217 | bufferlist bl; | |
218 | dout(10) << __func__ << " v" << version << dendl; | |
219 | __u8 v = 1; | |
220 | ::encode(v, bl); | |
221 | multimap<utime_t,LogEntry>::iterator p; | |
222 | for (p = pending_log.begin(); p != pending_log.end(); ++p) | |
223 | p->second.encode(bl, mon->get_quorum_con_features()); | |
224 | ||
225 | put_version(t, version, bl); | |
226 | put_last_committed(t, version); | |
227 | } | |
228 | ||
229 | void LogMonitor::encode_full(MonitorDBStore::TransactionRef t) | |
230 | { | |
231 | dout(10) << __func__ << " log v " << summary.version << dendl; | |
232 | assert(get_last_committed() == summary.version); | |
233 | ||
234 | bufferlist summary_bl; | |
235 | ::encode(summary, summary_bl, mon->get_quorum_con_features()); | |
236 | ||
237 | put_version_full(t, summary.version, summary_bl); | |
238 | put_version_latest_full(t, summary.version); | |
239 | } | |
240 | ||
241 | version_t LogMonitor::get_trim_to() | |
242 | { | |
243 | if (!mon->is_leader()) | |
244 | return 0; | |
245 | ||
246 | unsigned max = g_conf->mon_max_log_epochs; | |
247 | version_t version = get_last_committed(); | |
248 | if (version > max) | |
249 | return version - max; | |
250 | return 0; | |
251 | } | |
252 | ||
253 | bool LogMonitor::preprocess_query(MonOpRequestRef op) | |
254 | { | |
255 | op->mark_logmon_event("preprocess_query"); | |
256 | PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req()); | |
257 | dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl; | |
258 | switch (m->get_type()) { | |
259 | case MSG_MON_COMMAND: | |
260 | return preprocess_command(op); | |
261 | ||
262 | case MSG_LOG: | |
263 | return preprocess_log(op); | |
264 | ||
265 | default: | |
266 | ceph_abort(); | |
267 | return true; | |
268 | } | |
269 | } | |
270 | ||
271 | bool LogMonitor::prepare_update(MonOpRequestRef op) | |
272 | { | |
273 | op->mark_logmon_event("prepare_update"); | |
274 | PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req()); | |
275 | dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl; | |
276 | switch (m->get_type()) { | |
277 | case MSG_MON_COMMAND: | |
278 | return prepare_command(op); | |
279 | case MSG_LOG: | |
280 | return prepare_log(op); | |
281 | default: | |
282 | ceph_abort(); | |
283 | return false; | |
284 | } | |
285 | } | |
286 | ||
287 | bool LogMonitor::preprocess_log(MonOpRequestRef op) | |
288 | { | |
289 | op->mark_logmon_event("preprocess_log"); | |
290 | MLog *m = static_cast<MLog*>(op->get_req()); | |
291 | dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl; | |
292 | int num_new = 0; | |
293 | ||
294 | MonSession *session = m->get_session(); | |
295 | if (!session) | |
296 | goto done; | |
297 | if (!session->is_capable("log", MON_CAP_W)) { | |
298 | dout(0) << "preprocess_log got MLog from entity with insufficient privileges " | |
299 | << session->caps << dendl; | |
300 | goto done; | |
301 | } | |
302 | ||
303 | for (deque<LogEntry>::iterator p = m->entries.begin(); | |
304 | p != m->entries.end(); | |
305 | ++p) { | |
306 | if (!pending_summary.contains(p->key())) | |
307 | num_new++; | |
308 | } | |
309 | if (!num_new) { | |
310 | dout(10) << " nothing new" << dendl; | |
311 | goto done; | |
312 | } | |
313 | ||
314 | return false; | |
315 | ||
316 | done: | |
317 | mon->no_reply(op); | |
318 | return true; | |
319 | } | |
320 | ||
321 | struct LogMonitor::C_Log : public C_MonOp { | |
322 | LogMonitor *logmon; | |
323 | C_Log(LogMonitor *p, MonOpRequestRef o) : | |
324 | C_MonOp(o), logmon(p) {} | |
325 | void _finish(int r) override { | |
326 | if (r == -ECANCELED) { | |
327 | return; | |
328 | } | |
329 | logmon->_updated_log(op); | |
330 | } | |
331 | }; | |
332 | ||
333 | bool LogMonitor::prepare_log(MonOpRequestRef op) | |
334 | { | |
335 | op->mark_logmon_event("prepare_log"); | |
336 | MLog *m = static_cast<MLog*>(op->get_req()); | |
337 | dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl; | |
338 | ||
339 | if (m->fsid != mon->monmap->fsid) { | |
340 | dout(0) << "handle_log on fsid " << m->fsid << " != " << mon->monmap->fsid | |
341 | << dendl; | |
342 | return false; | |
343 | } | |
344 | ||
345 | for (deque<LogEntry>::iterator p = m->entries.begin(); | |
346 | p != m->entries.end(); | |
347 | ++p) { | |
348 | dout(10) << " logging " << *p << dendl; | |
349 | if (!pending_summary.contains(p->key())) { | |
350 | pending_summary.add(*p); | |
351 | pending_log.insert(pair<utime_t,LogEntry>(p->stamp, *p)); | |
352 | } | |
353 | } | |
354 | pending_summary.prune(g_conf->mon_log_max_summary); | |
355 | wait_for_finished_proposal(op, new C_Log(this, op)); | |
356 | return true; | |
357 | } | |
358 | ||
359 | void LogMonitor::_updated_log(MonOpRequestRef op) | |
360 | { | |
361 | MLog *m = static_cast<MLog*>(op->get_req()); | |
362 | dout(7) << "_updated_log for " << m->get_orig_source_inst() << dendl; | |
363 | mon->send_reply(op, new MLogAck(m->fsid, m->entries.rbegin()->seq)); | |
364 | } | |
365 | ||
366 | bool LogMonitor::should_propose(double& delay) | |
367 | { | |
368 | // commit now if we have a lot of pending events | |
369 | if (g_conf->mon_max_log_entries_per_event > 0 && | |
370 | pending_log.size() >= (unsigned)g_conf->mon_max_log_entries_per_event) | |
371 | return true; | |
372 | ||
373 | // otherwise fall back to generic policy | |
374 | return PaxosService::should_propose(delay); | |
375 | } | |
376 | ||
377 | ||
378 | bool LogMonitor::preprocess_command(MonOpRequestRef op) | |
379 | { | |
380 | op->mark_logmon_event("preprocess_command"); | |
381 | MMonCommand *m = static_cast<MMonCommand*>(op->get_req()); | |
382 | int r = -EINVAL; | |
383 | bufferlist rdata; | |
384 | stringstream ss; | |
385 | ||
386 | map<string, cmd_vartype> cmdmap; | |
387 | if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) { | |
388 | string rs = ss.str(); | |
389 | mon->reply_command(op, -EINVAL, rs, get_last_committed()); | |
390 | return true; | |
391 | } | |
392 | MonSession *session = m->get_session(); | |
393 | if (!session) { | |
394 | mon->reply_command(op, -EACCES, "access denied", get_last_committed()); | |
395 | return true; | |
396 | } | |
397 | ||
398 | string prefix; | |
399 | cmd_getval(g_ceph_context, cmdmap, "prefix", prefix); | |
400 | ||
401 | string format; | |
402 | cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain")); | |
403 | boost::scoped_ptr<Formatter> f(Formatter::create(format)); | |
404 | ||
405 | if (prefix == "log last") { | |
406 | int64_t num = 20; | |
407 | cmd_getval(g_ceph_context, cmdmap, "num", num); | |
408 | if (f) { | |
409 | f->open_array_section("tail"); | |
410 | } | |
411 | ||
412 | std::string level_str; | |
413 | clog_type level; | |
414 | if (cmd_getval(g_ceph_context, cmdmap, "level", level_str)) { | |
415 | level = LogEntry::str_to_level(level_str); | |
416 | if (level == CLOG_UNKNOWN) { | |
417 | ss << "Invalid severity '" << level_str << "'"; | |
418 | mon->reply_command(op, -EINVAL, ss.str(), get_last_committed()); | |
419 | return true; | |
420 | } | |
421 | } else { | |
422 | level = CLOG_INFO; | |
423 | } | |
424 | ||
425 | std::string channel; | |
426 | if (!cmd_getval(g_ceph_context, cmdmap, "channel", channel)) { | |
427 | channel = CLOG_CHANNEL_DEFAULT; | |
428 | } | |
429 | ||
430 | // We'll apply this twice, once while counting out lines | |
431 | // and once while outputting them. | |
432 | auto match = [level, channel](const LogEntry &entry) { | |
433 | return entry.prio >= level && (entry.channel == channel || channel == "*"); | |
434 | }; | |
435 | ||
436 | auto rp = summary.tail.rbegin(); | |
437 | for (; num > 0 && rp != summary.tail.rend(); ++rp) { | |
438 | if (match(*rp)) { | |
439 | num--; | |
440 | } | |
441 | } | |
442 | if (rp == summary.tail.rend()) { | |
443 | --rp; | |
444 | } | |
445 | ostringstream ss; | |
446 | for (; rp != summary.tail.rbegin(); --rp) { | |
447 | if (!match(*rp)) { | |
448 | continue; | |
449 | } | |
450 | ||
451 | if (f) { | |
452 | f->dump_object("entry", *rp); | |
453 | } else { | |
454 | ss << *rp << "\n"; | |
455 | } | |
456 | } | |
457 | if (f) { | |
458 | f->close_section(); | |
459 | f->flush(rdata); | |
460 | } else { | |
461 | rdata.append(ss.str()); | |
462 | } | |
463 | r = 0; | |
464 | } else { | |
465 | return false; | |
466 | } | |
467 | ||
468 | string rs; | |
469 | getline(ss, rs); | |
470 | mon->reply_command(op, r, rs, rdata, get_last_committed()); | |
471 | return true; | |
472 | } | |
473 | ||
474 | ||
475 | bool LogMonitor::prepare_command(MonOpRequestRef op) | |
476 | { | |
477 | op->mark_logmon_event("prepare_command"); | |
478 | MMonCommand *m = static_cast<MMonCommand*>(op->get_req()); | |
479 | stringstream ss; | |
480 | string rs; | |
481 | int err = -EINVAL; | |
482 | ||
483 | map<string, cmd_vartype> cmdmap; | |
484 | if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) { | |
485 | // ss has reason for failure | |
486 | string rs = ss.str(); | |
487 | mon->reply_command(op, -EINVAL, rs, get_last_committed()); | |
488 | return true; | |
489 | } | |
490 | ||
491 | string prefix; | |
492 | cmd_getval(g_ceph_context, cmdmap, "prefix", prefix); | |
493 | ||
494 | MonSession *session = m->get_session(); | |
495 | if (!session) { | |
496 | mon->reply_command(op, -EACCES, "access denied", get_last_committed()); | |
497 | return true; | |
498 | } | |
499 | ||
500 | if (prefix == "log") { | |
501 | vector<string> logtext; | |
502 | cmd_getval(g_ceph_context, cmdmap, "logtext", logtext); | |
503 | LogEntry le; | |
504 | le.who = m->get_orig_source_inst(); | |
505 | le.name = session->entity_name; | |
506 | le.stamp = m->get_recv_stamp(); | |
507 | le.seq = 0; | |
508 | le.prio = CLOG_INFO; | |
509 | le.channel = CLOG_CHANNEL_DEFAULT; | |
510 | le.msg = str_join(logtext, " "); | |
511 | pending_summary.add(le); | |
512 | pending_summary.prune(g_conf->mon_log_max_summary); | |
513 | pending_log.insert(pair<utime_t,LogEntry>(le.stamp, le)); | |
514 | wait_for_finished_proposal(op, new Monitor::C_Command( | |
515 | mon, op, 0, string(), get_last_committed() + 1)); | |
516 | return true; | |
517 | } | |
518 | ||
519 | getline(ss, rs); | |
520 | mon->reply_command(op, err, rs, get_last_committed()); | |
521 | return false; | |
522 | } | |
523 | ||
524 | ||
525 | int LogMonitor::sub_name_to_id(const string& n) | |
526 | { | |
527 | if (n.substr(0, 4) == "log-" && n.size() > 4) { | |
528 | return LogEntry::str_to_level(n.substr(4)); | |
529 | } else { | |
530 | return CLOG_UNKNOWN; | |
531 | } | |
532 | } | |
533 | ||
534 | void LogMonitor::check_subs() | |
535 | { | |
536 | dout(10) << __func__ << dendl; | |
537 | for (map<string, xlist<Subscription*>*>::iterator i = mon->session_map.subs.begin(); | |
538 | i != mon->session_map.subs.end(); | |
539 | ++i) { | |
540 | for (xlist<Subscription*>::iterator j = i->second->begin(); !j.end(); ++j) { | |
541 | if (sub_name_to_id((*j)->type) >= 0) | |
542 | check_sub(*j); | |
543 | } | |
544 | } | |
545 | } | |
546 | ||
547 | void LogMonitor::check_sub(Subscription *s) | |
548 | { | |
549 | dout(10) << __func__ << " client wants " << s->type << " ver " << s->next << dendl; | |
550 | ||
551 | int sub_level = sub_name_to_id(s->type); | |
552 | assert(sub_level >= 0); | |
553 | ||
554 | version_t summary_version = summary.version; | |
555 | if (s->next > summary_version) { | |
556 | dout(10) << __func__ << " client " << s->session->inst | |
557 | << " requested version (" << s->next << ") is greater than ours (" | |
558 | << summary_version << "), which means we already sent him" | |
559 | << " everything we have." << dendl; | |
560 | return; | |
561 | } | |
562 | ||
563 | MLog *mlog = new MLog(mon->monmap->fsid); | |
564 | ||
565 | if (s->next == 0) { | |
566 | /* First timer, heh? */ | |
567 | bool ret = _create_sub_summary(mlog, sub_level); | |
568 | if (!ret) { | |
569 | dout(1) << __func__ << " ret = " << ret << dendl; | |
570 | mlog->put(); | |
571 | return; | |
572 | } | |
573 | } else { | |
574 | /* let us send you an incremental log... */ | |
575 | _create_sub_incremental(mlog, sub_level, s->next); | |
576 | } | |
577 | ||
578 | dout(1) << __func__ << " sending message to " << s->session->inst | |
579 | << " with " << mlog->entries.size() << " entries" | |
580 | << " (version " << mlog->version << ")" << dendl; | |
581 | ||
582 | if (!mlog->entries.empty()) { | |
583 | s->session->con->send_message(mlog); | |
584 | } else { | |
585 | mlog->put(); | |
586 | } | |
587 | if (s->onetime) | |
588 | mon->session_map.remove_sub(s); | |
589 | else | |
590 | s->next = summary_version+1; | |
591 | } | |
592 | ||
593 | /** | |
594 | * Create a log message containing only the last message in the summary. | |
595 | * | |
596 | * @param mlog Log message we'll send to the client. | |
597 | * @param level Maximum log level the client is interested in. | |
598 | * @return 'true' if we consider we successfully populated @mlog; | |
599 | * 'false' otherwise. | |
600 | */ | |
601 | bool LogMonitor::_create_sub_summary(MLog *mlog, int level) | |
602 | { | |
603 | dout(10) << __func__ << dendl; | |
604 | ||
605 | assert(mlog != NULL); | |
606 | ||
607 | if (!summary.tail.size()) | |
608 | return false; | |
609 | ||
610 | list<LogEntry>::reverse_iterator it = summary.tail.rbegin(); | |
611 | for (; it != summary.tail.rend(); ++it) { | |
612 | LogEntry e = *it; | |
613 | if (e.prio < level) | |
614 | continue; | |
615 | ||
616 | mlog->entries.push_back(e); | |
617 | mlog->version = summary.version; | |
618 | break; | |
619 | } | |
620 | ||
621 | return true; | |
622 | } | |
623 | ||
624 | /** | |
625 | * Create an incremental log message from version \p sv to \p summary.version | |
626 | * | |
627 | * @param mlog Log message we'll send to the client with the messages received | |
628 | * since version \p sv, inclusive. | |
629 | * @param level The max log level of the messages the client is interested in. | |
630 | * @param sv The version the client is looking for. | |
631 | */ | |
632 | void LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv) | |
633 | { | |
634 | dout(10) << __func__ << " level " << level << " ver " << sv | |
635 | << " cur summary ver " << summary.version << dendl; | |
636 | ||
637 | if (sv < get_first_committed()) { | |
638 | dout(10) << __func__ << " skipped from " << sv | |
639 | << " to first_committed " << get_first_committed() << dendl; | |
640 | LogEntry le; | |
641 | le.stamp = ceph_clock_now(); | |
642 | le.prio = CLOG_WARN; | |
643 | ostringstream ss; | |
644 | ss << "skipped log messages from " << sv << " to " << get_first_committed(); | |
645 | le.msg = ss.str(); | |
646 | mlog->entries.push_back(le); | |
647 | sv = get_first_committed(); | |
648 | } | |
649 | ||
650 | version_t summary_ver = summary.version; | |
651 | while (sv && sv <= summary_ver) { | |
652 | bufferlist bl; | |
653 | int err = get_version(sv, bl); | |
654 | assert(err == 0); | |
655 | assert(bl.length()); | |
656 | bufferlist::iterator p = bl.begin(); | |
657 | __u8 v; | |
658 | ::decode(v,p); | |
659 | while (!p.end()) { | |
660 | LogEntry le; | |
661 | le.decode(p); | |
662 | ||
663 | if (le.prio < level) { | |
664 | dout(20) << __func__ << " requested " << level | |
665 | << " entry " << le.prio << dendl; | |
666 | continue; | |
667 | } | |
668 | ||
669 | mlog->entries.push_back(le); | |
670 | } | |
671 | mlog->version = sv++; | |
672 | } | |
673 | ||
674 | dout(10) << __func__ << " incremental message ready (" | |
675 | << mlog->entries.size() << " entries)" << dendl; | |
676 | } | |
677 | ||
678 | void LogMonitor::update_log_channels() | |
679 | { | |
680 | ostringstream oss; | |
681 | ||
682 | channels.clear(); | |
683 | ||
684 | int r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog, | |
685 | oss, &channels.log_to_syslog, | |
686 | CLOG_CONFIG_DEFAULT_KEY); | |
687 | if (r < 0) { | |
688 | derr << __func__ << " error parsing 'mon_cluster_log_to_syslog'" << dendl; | |
689 | return; | |
690 | } | |
691 | ||
692 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_level, | |
693 | oss, &channels.syslog_level, | |
694 | CLOG_CONFIG_DEFAULT_KEY); | |
695 | if (r < 0) { | |
696 | derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_level'" | |
697 | << dendl; | |
698 | return; | |
699 | } | |
700 | ||
701 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_facility, | |
702 | oss, &channels.syslog_facility, | |
703 | CLOG_CONFIG_DEFAULT_KEY); | |
704 | if (r < 0) { | |
705 | derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_facility'" | |
706 | << dendl; | |
707 | return; | |
708 | } | |
709 | ||
710 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_file, oss, | |
711 | &channels.log_file, | |
712 | CLOG_CONFIG_DEFAULT_KEY); | |
713 | if (r < 0) { | |
714 | derr << __func__ << " error parsing 'mon_cluster_log_file'" << dendl; | |
715 | return; | |
716 | } | |
717 | ||
718 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_file_level, oss, | |
719 | &channels.log_file_level, | |
720 | CLOG_CONFIG_DEFAULT_KEY); | |
721 | if (r < 0) { | |
722 | derr << __func__ << " error parsing 'mon_cluster_log_file_level'" | |
723 | << dendl; | |
724 | return; | |
725 | } | |
726 | ||
727 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog, oss, | |
728 | &channels.log_to_graylog, | |
729 | CLOG_CONFIG_DEFAULT_KEY); | |
730 | if (r < 0) { | |
731 | derr << __func__ << " error parsing 'mon_cluster_log_to_graylog'" | |
732 | << dendl; | |
733 | return; | |
734 | } | |
735 | ||
736 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_host, oss, | |
737 | &channels.log_to_graylog_host, | |
738 | CLOG_CONFIG_DEFAULT_KEY); | |
739 | if (r < 0) { | |
740 | derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_host'" | |
741 | << dendl; | |
742 | return; | |
743 | } | |
744 | ||
745 | r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_port, oss, | |
746 | &channels.log_to_graylog_port, | |
747 | CLOG_CONFIG_DEFAULT_KEY); | |
748 | if (r < 0) { | |
749 | derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_port'" | |
750 | << dendl; | |
751 | return; | |
752 | } | |
753 | ||
754 | channels.expand_channel_meta(); | |
755 | } | |
756 | ||
757 | void LogMonitor::log_channel_info::expand_channel_meta(map<string,string> &m) | |
758 | { | |
759 | generic_dout(20) << __func__ << " expand map: " << m << dendl; | |
760 | for (map<string,string>::iterator p = m.begin(); p != m.end(); ++p) { | |
761 | m[p->first] = expand_channel_meta(p->second, p->first); | |
762 | } | |
763 | generic_dout(20) << __func__ << " expanded map: " << m << dendl; | |
764 | } | |
765 | ||
766 | string LogMonitor::log_channel_info::expand_channel_meta( | |
767 | const string &input, | |
768 | const string &change_to) | |
769 | { | |
770 | size_t pos = string::npos; | |
771 | string s(input); | |
772 | while ((pos = s.find(LOG_META_CHANNEL)) != string::npos) { | |
773 | string tmp = s.substr(0, pos) + change_to; | |
774 | if (pos+LOG_META_CHANNEL.length() < s.length()) | |
775 | tmp += s.substr(pos+LOG_META_CHANNEL.length()); | |
776 | s = tmp; | |
777 | } | |
778 | generic_dout(20) << __func__ << " from '" << input | |
779 | << "' to '" << s << "'" << dendl; | |
780 | ||
781 | return s; | |
782 | } | |
783 | ||
784 | bool LogMonitor::log_channel_info::do_log_to_syslog(const string &channel) { | |
785 | string v = get_str_map_key(log_to_syslog, channel, | |
786 | &CLOG_CONFIG_DEFAULT_KEY); | |
787 | // We expect booleans, but they are in k/v pairs, kept | |
788 | // as strings, in 'log_to_syslog'. We must ensure | |
789 | // compatibility with existing boolean handling, and so | |
790 | // we are here using a modified version of how | |
791 | // md_config_t::set_val_raw() handles booleans. We will | |
792 | // accept both 'true' and 'false', but will also check for | |
793 | // '1' and '0'. The main distiction between this and the | |
794 | // original code is that we will assume everything not '1', | |
795 | // '0', 'true' or 'false' to be 'false'. | |
796 | bool ret = false; | |
797 | ||
798 | if (boost::iequals(v, "false")) { | |
799 | ret = false; | |
800 | } else if (boost::iequals(v, "true")) { | |
801 | ret = true; | |
802 | } else { | |
803 | std::string err; | |
804 | int b = strict_strtol(v.c_str(), 10, &err); | |
805 | ret = (err.empty() && b == 1); | |
806 | } | |
807 | ||
808 | return ret; | |
809 | } | |
810 | ||
811 | ceph::logging::Graylog::Ref LogMonitor::log_channel_info::get_graylog( | |
812 | const string &channel) | |
813 | { | |
814 | generic_dout(25) << __func__ << " for channel '" | |
815 | << channel << "'" << dendl; | |
816 | ||
817 | if (graylogs.count(channel) == 0) { | |
818 | auto graylog(std::make_shared<ceph::logging::Graylog>("mon")); | |
819 | ||
820 | graylog->set_fsid(g_conf->get_val<uuid_d>("fsid")); | |
821 | graylog->set_hostname(g_conf->host); | |
822 | graylog->set_destination(get_str_map_key(log_to_graylog_host, channel, | |
823 | &CLOG_CONFIG_DEFAULT_KEY), | |
824 | atoi(get_str_map_key(log_to_graylog_port, channel, | |
825 | &CLOG_CONFIG_DEFAULT_KEY).c_str())); | |
826 | ||
827 | graylogs[channel] = graylog; | |
828 | generic_dout(20) << __func__ << " for channel '" | |
829 | << channel << "' to graylog host '" | |
830 | << log_to_graylog_host[channel] << ":" | |
831 | << log_to_graylog_port[channel] | |
832 | << "'" << dendl; | |
833 | } | |
834 | return graylogs[channel]; | |
835 | } | |
836 | ||
837 | void LogMonitor::handle_conf_change(const struct md_config_t *conf, | |
838 | const std::set<std::string> &changed) | |
839 | { | |
840 | if (changed.count("mon_cluster_log_to_syslog") || | |
841 | changed.count("mon_cluster_log_to_syslog_level") || | |
842 | changed.count("mon_cluster_log_to_syslog_facility") || | |
843 | changed.count("mon_cluster_log_file") || | |
844 | changed.count("mon_cluster_log_file_level") || | |
845 | changed.count("mon_cluster_log_to_graylog") || | |
846 | changed.count("mon_cluster_log_to_graylog_host") || | |
847 | changed.count("mon_cluster_log_to_graylog_port")) { | |
848 | update_log_channels(); | |
849 | } | |
850 | } |