]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/LogClient.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / common / LogClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "common/LogClient.h"
16 #include "include/str_map.h"
17 #include "messages/MLog.h"
18 #include "messages/MLogAck.h"
19 #include "msg/Messenger.h"
20 #include "mon/MonMap.h"
21 #include "common/Graylog.h"
22
23 #define dout_subsys ceph_subsys_monc
24
25 using std::map;
26 using std::ostream;
27 using std::ostringstream;
28 using std::string;
29
30 #undef dout_prefix
31 #define dout_prefix _prefix(_dout, this)
32 static ostream& _prefix(std::ostream *_dout, LogClient *logc) {
33 return *_dout << "log_client ";
34 }
35
36 static ostream& _prefix(std::ostream *_dout, LogChannel *lc) {
37 return *_dout << "log_channel(" << lc->get_log_channel() << ") ";
38 }
39
40 LogChannel::LogChannel(CephContext *cct, LogClient *lc, const string &channel)
41 : cct(cct), parent(lc),
42 log_channel(channel), log_to_syslog(false), log_to_monitors(false)
43 {
44 }
45
46 LogChannel::LogChannel(CephContext *cct, LogClient *lc,
47 const string &channel, const string &facility,
48 const string &prio)
49 : cct(cct), parent(lc),
50 log_channel(channel), log_prio(prio), syslog_facility(facility),
51 log_to_syslog(false), log_to_monitors(false)
52 {
53 }
54
55 LogClient::LogClient(CephContext *cct, Messenger *m, MonMap *mm,
56 enum logclient_flag_t flags)
57 : cct(cct), messenger(m), monmap(mm), is_mon(flags & FLAG_MON),
58 last_log_sent(0), last_log(0)
59 {
60 }
61
62 void LogChannel::set_log_to_monitors(bool v)
63 {
64 if (log_to_monitors != v) {
65 parent->reset();
66 log_to_monitors = v;
67 }
68 }
69
70 void LogChannel::update_config(const clog_targets_conf_t& conf_strings)
71 {
72 ldout(cct, 20) << __func__ << " log_to_monitors " << conf_strings.log_to_monitors
73 << " log_to_syslog " << conf_strings.log_to_syslog
74 << " log_channels " << conf_strings.log_channels
75 << " log_prios " << conf_strings.log_prios
76 << dendl;
77
78 bool to_monitors = (conf_strings.log_to_monitors == "true");
79 bool to_syslog = (conf_strings.log_to_syslog == "true");
80 bool to_graylog = (conf_strings.log_to_graylog == "true");
81 auto graylog_port = atoi(conf_strings.log_to_graylog_port.c_str());
82
83 set_log_to_monitors(to_monitors);
84 set_log_to_syslog(to_syslog);
85 set_syslog_facility(conf_strings.log_channels);
86 set_log_prio(conf_strings.log_prios);
87
88 if (to_graylog && !graylog) { /* should but isn't */
89 graylog = std::make_shared<ceph::logging::Graylog>("clog");
90 } else if (!to_graylog && graylog) { /* shouldn't but is */
91 graylog.reset();
92 }
93
94 if (to_graylog && graylog) {
95 graylog->set_fsid(conf_strings.fsid);
96 graylog->set_hostname(conf_strings.host);
97 }
98
99 if (graylog && !conf_strings.log_to_graylog_host.empty() && (graylog_port != 0)) {
100 graylog->set_destination(conf_strings.log_to_graylog_host, graylog_port);
101 }
102
103 ldout(cct, 10) << __func__
104 << " to_monitors: " << (to_monitors ? "true" : "false")
105 << " to_syslog: " << (to_syslog ? "true" : "false")
106 << " syslog_facility: " << conf_strings.log_channels
107 << " prio: " << conf_strings.log_prios
108 << " to_graylog: " << (to_graylog ? "true" : "false")
109 << " graylog_host: " << conf_strings.log_to_graylog_host
110 << " graylog_port: " << graylog_port
111 << ")" << dendl;
112 }
113
114 clog_targets_conf_t LogChannel::parse_client_options(CephContext* conf_cct)
115 {
116 auto parsed_options = parse_log_client_options(conf_cct);
117 update_config(parsed_options);
118 return parsed_options;
119 }
120
121 clog_targets_conf_t LogChannel::parse_log_client_options(CephContext* cct)
122 {
123 clog_targets_conf_t targets;
124
125 targets.log_to_monitors =
126 get_value_via_strmap(cct->_conf.get_val<string>("clog_to_monitors"),
127 log_channel, CLOG_CONFIG_DEFAULT_KEY);
128 targets.log_to_syslog =
129 get_value_via_strmap(cct->_conf.get_val<string>("clog_to_syslog"),
130 log_channel, CLOG_CONFIG_DEFAULT_KEY);
131 targets.log_channels =
132 get_value_via_strmap(cct->_conf.get_val<string>("clog_to_syslog_facility"),
133 log_channel, CLOG_CONFIG_DEFAULT_KEY);
134 targets.log_prios =
135 get_value_via_strmap(cct->_conf.get_val<string>("clog_to_syslog_level"),
136 log_channel, CLOG_CONFIG_DEFAULT_KEY);
137 targets.log_to_graylog =
138 get_value_via_strmap(cct->_conf.get_val<string>("clog_to_graylog"),
139 log_channel, CLOG_CONFIG_DEFAULT_KEY);
140 targets.log_to_graylog_host =
141 get_value_via_strmap(cct->_conf.get_val<string>("clog_to_graylog_host"),
142 log_channel, CLOG_CONFIG_DEFAULT_KEY);
143 targets.log_to_graylog_port =
144 get_value_via_strmap(cct->_conf.get_val<string>("clog_to_graylog_port"),
145 log_channel, CLOG_CONFIG_DEFAULT_KEY);
146
147 targets.fsid = cct->_conf.get_val<uuid_d>("fsid");
148 targets.host = cct->_conf->host;
149 return targets;
150 }
151
152 void LogChannel::do_log(clog_type prio, std::stringstream& ss)
153 {
154 while (!ss.eof()) {
155 string s;
156 getline(ss, s);
157 if (!s.empty())
158 do_log(prio, s);
159 }
160 }
161
162 void LogChannel::do_log(clog_type prio, const std::string& s)
163 {
164 std::lock_guard l(channel_lock);
165 if (CLOG_ERROR == prio) {
166 ldout(cct,-1) << "log " << prio << " : " << s << dendl;
167 } else {
168 ldout(cct,0) << "log " << prio << " : " << s << dendl;
169 }
170 LogEntry e;
171 e.stamp = ceph_clock_now();
172 // seq and who should be set for syslog/graylog/log_to_mon
173 e.addrs = parent->get_myaddrs();
174 e.name = parent->get_myname();
175 e.rank = parent->get_myrank();
176 e.prio = prio;
177 e.msg = s;
178 e.channel = get_log_channel();
179
180 // log to monitor?
181 if (log_to_monitors) {
182 e.seq = parent->queue(e);
183 } else {
184 e.seq = parent->get_next_seq();
185 }
186
187 // log to syslog?
188 if (do_log_to_syslog()) {
189 ldout(cct,0) << __func__ << " log to syslog" << dendl;
190 e.log_to_syslog(get_log_prio(), get_syslog_facility());
191 }
192
193 // log to graylog?
194 if (do_log_to_graylog()) {
195 ldout(cct,0) << __func__ << " log to graylog" << dendl;
196 graylog->log_log_entry(&e);
197 }
198 }
199
200 ceph::ref_t<Message> LogClient::get_mon_log_message(bool flush)
201 {
202 std::lock_guard l(log_lock);
203 if (flush) {
204 if (log_queue.empty())
205 return nullptr;
206 // reset session
207 last_log_sent = log_queue.front().seq;
208 }
209 return _get_mon_log_message();
210 }
211
212 bool LogClient::are_pending()
213 {
214 std::lock_guard l(log_lock);
215 return last_log > last_log_sent;
216 }
217
218 ceph::ref_t<Message> LogClient::_get_mon_log_message()
219 {
220 ceph_assert(ceph_mutex_is_locked(log_lock));
221 if (log_queue.empty())
222 return {};
223
224 // only send entries that haven't been sent yet during this mon
225 // session! monclient needs to call reset_session() on mon session
226 // reset for this to work right.
227
228 if (last_log_sent == last_log)
229 return {};
230
231 // limit entries per message
232 unsigned num_unsent = last_log - last_log_sent;
233 unsigned num_send;
234 if (cct->_conf->mon_client_max_log_entries_per_message > 0)
235 num_send = std::min(num_unsent, (unsigned)cct->_conf->mon_client_max_log_entries_per_message);
236 else
237 num_send = num_unsent;
238
239 ldout(cct,10) << " log_queue is " << log_queue.size() << " last_log " << last_log << " sent " << last_log_sent
240 << " num " << log_queue.size()
241 << " unsent " << num_unsent
242 << " sending " << num_send << dendl;
243 ceph_assert(num_unsent <= log_queue.size());
244 std::deque<LogEntry>::iterator p = log_queue.begin();
245 std::deque<LogEntry> o;
246 while (p->seq <= last_log_sent) {
247 ++p;
248 ceph_assert(p != log_queue.end());
249 }
250 while (num_send--) {
251 ceph_assert(p != log_queue.end());
252 o.push_back(*p);
253 last_log_sent = p->seq;
254 ldout(cct,10) << " will send " << *p << dendl;
255 ++p;
256 }
257
258 return ceph::make_message<MLog>(monmap->get_fsid(),
259 std::move(o));
260 }
261
262 void LogClient::_send_to_mon()
263 {
264 ceph_assert(ceph_mutex_is_locked(log_lock));
265 ceph_assert(is_mon);
266 ceph_assert(messenger->get_myname().is_mon());
267 ldout(cct,10) << __func__ << " log to self" << dendl;
268 auto log = _get_mon_log_message();
269 messenger->get_loopback_connection()->send_message2(std::move(log));
270 }
271
272 version_t LogClient::queue(LogEntry &entry)
273 {
274 std::lock_guard l(log_lock);
275 entry.seq = ++last_log;
276 log_queue.push_back(entry);
277
278 if (is_mon) {
279 _send_to_mon();
280 }
281
282 return entry.seq;
283 }
284
285 void LogClient::reset()
286 {
287 std::lock_guard l(log_lock);
288 if (log_queue.size()) {
289 log_queue.clear();
290 }
291 last_log_sent = last_log;
292 }
293
294 uint64_t LogClient::get_next_seq()
295 {
296 std::lock_guard l(log_lock);
297 return ++last_log;
298 }
299
300 entity_addrvec_t LogClient::get_myaddrs()
301 {
302 return messenger->get_myaddrs();
303 }
304
305 entity_name_t LogClient::get_myrank()
306 {
307 return messenger->get_myname();
308 }
309
310 const EntityName& LogClient::get_myname()
311 {
312 return cct->_conf->name;
313 }
314
315 bool LogClient::handle_log_ack(MLogAck *m)
316 {
317 std::lock_guard l(log_lock);
318 ldout(cct,10) << "handle_log_ack " << *m << dendl;
319
320 version_t last = m->last;
321
322 auto q = log_queue.begin();
323 while (q != log_queue.end()) {
324 const LogEntry &entry(*q);
325 if (entry.seq > last)
326 break;
327 ldout(cct,10) << " logged " << entry << dendl;
328 q = log_queue.erase(q);
329 }
330 return true;
331 }