]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/LogClient.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / common / LogClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "common/LogClient.h"
16 #include "include/str_map.h"
17 #include "messages/MLog.h"
18 #include "messages/MLogAck.h"
19 #include "msg/Messenger.h"
20 #include "mon/MonMap.h"
21 #include "common/Graylog.h"
22
23 #define dout_subsys ceph_subsys_monc
24
25 int parse_log_client_options(CephContext *cct,
26 map<string,string> &log_to_monitors,
27 map<string,string> &log_to_syslog,
28 map<string,string> &log_channels,
29 map<string,string> &log_prios,
30 map<string,string> &log_to_graylog,
31 map<string,string> &log_to_graylog_host,
32 map<string,string> &log_to_graylog_port,
33 uuid_d &fsid,
34 string &host)
35 {
36 ostringstream oss;
37
38 int r = get_conf_str_map_helper(
39 cct->_conf.get_val<string>("clog_to_monitors"), oss,
40 &log_to_monitors, CLOG_CONFIG_DEFAULT_KEY);
41 if (r < 0) {
42 lderr(cct) << __func__ << " error parsing 'clog_to_monitors'" << dendl;
43 return r;
44 }
45
46 r = get_conf_str_map_helper(
47 cct->_conf.get_val<string>("clog_to_syslog"), oss,
48 &log_to_syslog, CLOG_CONFIG_DEFAULT_KEY);
49 if (r < 0) {
50 lderr(cct) << __func__ << " error parsing 'clog_to_syslog'" << dendl;
51 return r;
52 }
53
54 r = get_conf_str_map_helper(
55 cct->_conf.get_val<string>("clog_to_syslog_facility"), oss,
56 &log_channels, CLOG_CONFIG_DEFAULT_KEY);
57 if (r < 0) {
58 lderr(cct) << __func__ << " error parsing 'clog_to_syslog_facility'" << dendl;
59 return r;
60 }
61
62 r = get_conf_str_map_helper(
63 cct->_conf.get_val<string>("clog_to_syslog_level"), oss,
64 &log_prios, CLOG_CONFIG_DEFAULT_KEY);
65 if (r < 0) {
66 lderr(cct) << __func__ << " error parsing 'clog_to_syslog_level'" << dendl;
67 return r;
68 }
69
70 r = get_conf_str_map_helper(
71 cct->_conf.get_val<string>("clog_to_graylog"), oss,
72 &log_to_graylog, CLOG_CONFIG_DEFAULT_KEY);
73 if (r < 0) {
74 lderr(cct) << __func__ << " error parsing 'clog_to_graylog'" << dendl;
75 return r;
76 }
77
78 r = get_conf_str_map_helper(
79 cct->_conf.get_val<string>("clog_to_graylog_host"), oss,
80 &log_to_graylog_host, CLOG_CONFIG_DEFAULT_KEY);
81 if (r < 0) {
82 lderr(cct) << __func__ << " error parsing 'clog_to_graylog_host'" << dendl;
83 return r;
84 }
85
86 r = get_conf_str_map_helper(
87 cct->_conf.get_val<string>("clog_to_graylog_port"), oss,
88 &log_to_graylog_port, CLOG_CONFIG_DEFAULT_KEY);
89 if (r < 0) {
90 lderr(cct) << __func__ << " error parsing 'clog_to_graylog_port'" << dendl;
91 return r;
92 }
93
94 fsid = cct->_conf.get_val<uuid_d>("fsid");
95 host = cct->_conf->host;
96 return 0;
97 }
98
99 #undef dout_prefix
100 #define dout_prefix _prefix(_dout, this)
101 static ostream& _prefix(std::ostream *_dout, LogClient *logc) {
102 return *_dout << "log_client ";
103 }
104
105 static ostream& _prefix(std::ostream *_dout, LogChannel *lc) {
106 return *_dout << "log_channel(" << lc->get_log_channel() << ") ";
107 }
108
109 LogChannel::LogChannel(CephContext *cct, LogClient *lc, const string &channel)
110 : cct(cct), parent(lc),
111 log_channel(channel), log_to_syslog(false), log_to_monitors(false)
112 {
113 }
114
115 LogChannel::LogChannel(CephContext *cct, LogClient *lc,
116 const string &channel, const string &facility,
117 const string &prio)
118 : cct(cct), parent(lc),
119 log_channel(channel), log_prio(prio), syslog_facility(facility),
120 log_to_syslog(false), log_to_monitors(false)
121 {
122 }
123
124 LogClient::LogClient(CephContext *cct, Messenger *m, MonMap *mm,
125 enum logclient_flag_t flags)
126 : cct(cct), messenger(m), monmap(mm), is_mon(flags & FLAG_MON),
127 last_log_sent(0), last_log(0)
128 {
129 }
130
131 void LogChannel::update_config(map<string,string> &log_to_monitors,
132 map<string,string> &log_to_syslog,
133 map<string,string> &log_channels,
134 map<string,string> &log_prios,
135 map<string,string> &log_to_graylog,
136 map<string,string> &log_to_graylog_host,
137 map<string,string> &log_to_graylog_port,
138 uuid_d &fsid,
139 string &host)
140 {
141 ldout(cct, 20) << __func__ << " log_to_monitors " << log_to_monitors
142 << " log_to_syslog " << log_to_syslog
143 << " log_channels " << log_channels
144 << " log_prios " << log_prios
145 << dendl;
146 bool to_monitors = (get_str_map_key(log_to_monitors, log_channel,
147 &CLOG_CONFIG_DEFAULT_KEY) == "true");
148 bool to_syslog = (get_str_map_key(log_to_syslog, log_channel,
149 &CLOG_CONFIG_DEFAULT_KEY) == "true");
150 string syslog_facility = get_str_map_key(log_channels, log_channel,
151 &CLOG_CONFIG_DEFAULT_KEY);
152 string prio = get_str_map_key(log_prios, log_channel,
153 &CLOG_CONFIG_DEFAULT_KEY);
154 bool to_graylog = (get_str_map_key(log_to_graylog, log_channel,
155 &CLOG_CONFIG_DEFAULT_KEY) == "true");
156 string graylog_host = get_str_map_key(log_to_graylog_host, log_channel,
157 &CLOG_CONFIG_DEFAULT_KEY);
158 string graylog_port_str = get_str_map_key(log_to_graylog_port, log_channel,
159 &CLOG_CONFIG_DEFAULT_KEY);
160 int graylog_port = atoi(graylog_port_str.c_str());
161
162 set_log_to_monitors(to_monitors);
163 set_log_to_syslog(to_syslog);
164 set_syslog_facility(syslog_facility);
165 set_log_prio(prio);
166
167 if (to_graylog && !graylog) { /* should but isn't */
168 graylog = std::make_shared<ceph::logging::Graylog>("clog");
169 } else if (!to_graylog && graylog) { /* shouldn't but is */
170 graylog.reset();
171 }
172
173 if (to_graylog && graylog) {
174 graylog->set_fsid(fsid);
175 graylog->set_hostname(host);
176 }
177
178 if (graylog && (!graylog_host.empty()) && (graylog_port != 0)) {
179 graylog->set_destination(graylog_host, graylog_port);
180 }
181
182 ldout(cct, 10) << __func__
183 << " to_monitors: " << (to_monitors ? "true" : "false")
184 << " to_syslog: " << (to_syslog ? "true" : "false")
185 << " syslog_facility: " << syslog_facility
186 << " prio: " << prio
187 << " to_graylog: " << (to_graylog ? "true" : "false")
188 << " graylog_host: " << graylog_host
189 << " graylog_port: " << graylog_port
190 << ")" << dendl;
191 }
192
193 void LogChannel::do_log(clog_type prio, std::stringstream& ss)
194 {
195 while (!ss.eof()) {
196 string s;
197 getline(ss, s);
198 if (!s.empty())
199 do_log(prio, s);
200 }
201 }
202
203 void LogChannel::do_log(clog_type prio, const std::string& s)
204 {
205 std::lock_guard l(channel_lock);
206 if (CLOG_ERROR == prio) {
207 ldout(cct,-1) << "log " << prio << " : " << s << dendl;
208 } else {
209 ldout(cct,0) << "log " << prio << " : " << s << dendl;
210 }
211 LogEntry e;
212 e.stamp = ceph_clock_now();
213 // seq and who should be set for syslog/graylog/log_to_mon
214 e.addrs = parent->get_myaddrs();
215 e.name = parent->get_myname();
216 e.rank = parent->get_myrank();
217 e.prio = prio;
218 e.msg = s;
219 e.channel = get_log_channel();
220
221 // log to monitor?
222 if (log_to_monitors) {
223 e.seq = parent->queue(e);
224 } else {
225 e.seq = parent->get_next_seq();
226 }
227
228 // log to syslog?
229 if (do_log_to_syslog()) {
230 ldout(cct,0) << __func__ << " log to syslog" << dendl;
231 e.log_to_syslog(get_log_prio(), get_syslog_facility());
232 }
233
234 // log to graylog?
235 if (do_log_to_graylog()) {
236 ldout(cct,0) << __func__ << " log to graylog" << dendl;
237 graylog->log_log_entry(&e);
238 }
239 }
240
241 ceph::ref_t<Message> LogClient::get_mon_log_message(bool flush)
242 {
243 std::lock_guard l(log_lock);
244 if (flush) {
245 if (log_queue.empty())
246 return nullptr;
247 // reset session
248 last_log_sent = log_queue.front().seq;
249 }
250 return _get_mon_log_message();
251 }
252
253 bool LogClient::are_pending()
254 {
255 std::lock_guard l(log_lock);
256 return last_log > last_log_sent;
257 }
258
259 ceph::ref_t<Message> LogClient::_get_mon_log_message()
260 {
261 ceph_assert(ceph_mutex_is_locked(log_lock));
262 if (log_queue.empty())
263 return {};
264
265 // only send entries that haven't been sent yet during this mon
266 // session! monclient needs to call reset_session() on mon session
267 // reset for this to work right.
268
269 if (last_log_sent == last_log)
270 return {};
271
272 // limit entries per message
273 unsigned num_unsent = last_log - last_log_sent;
274 unsigned num_send;
275 if (cct->_conf->mon_client_max_log_entries_per_message > 0)
276 num_send = std::min(num_unsent, (unsigned)cct->_conf->mon_client_max_log_entries_per_message);
277 else
278 num_send = num_unsent;
279
280 ldout(cct,10) << " log_queue is " << log_queue.size() << " last_log " << last_log << " sent " << last_log_sent
281 << " num " << log_queue.size()
282 << " unsent " << num_unsent
283 << " sending " << num_send << dendl;
284 ceph_assert(num_unsent <= log_queue.size());
285 std::deque<LogEntry>::iterator p = log_queue.begin();
286 std::deque<LogEntry> o;
287 while (p->seq <= last_log_sent) {
288 ++p;
289 ceph_assert(p != log_queue.end());
290 }
291 while (num_send--) {
292 ceph_assert(p != log_queue.end());
293 o.push_back(*p);
294 last_log_sent = p->seq;
295 ldout(cct,10) << " will send " << *p << dendl;
296 ++p;
297 }
298
299 return ceph::make_message<MLog>(monmap->get_fsid(),
300 std::move(o));
301 }
302
303 void LogClient::_send_to_mon()
304 {
305 ceph_assert(ceph_mutex_is_locked(log_lock));
306 ceph_assert(is_mon);
307 ceph_assert(messenger->get_myname().is_mon());
308 ldout(cct,10) << __func__ << " log to self" << dendl;
309 auto log = _get_mon_log_message();
310 messenger->get_loopback_connection()->send_message2(std::move(log));
311 }
312
313 version_t LogClient::queue(LogEntry &entry)
314 {
315 std::lock_guard l(log_lock);
316 entry.seq = ++last_log;
317 log_queue.push_back(entry);
318
319 if (is_mon) {
320 _send_to_mon();
321 }
322
323 return entry.seq;
324 }
325
326 uint64_t LogClient::get_next_seq()
327 {
328 std::lock_guard l(log_lock);
329 return ++last_log;
330 }
331
332 entity_addrvec_t LogClient::get_myaddrs()
333 {
334 return messenger->get_myaddrs();
335 }
336
337 entity_name_t LogClient::get_myrank()
338 {
339 return messenger->get_myname();
340 }
341
342 const EntityName& LogClient::get_myname()
343 {
344 return cct->_conf->name;
345 }
346
347 bool LogClient::handle_log_ack(MLogAck *m)
348 {
349 std::lock_guard l(log_lock);
350 ldout(cct,10) << "handle_log_ack " << *m << dendl;
351
352 version_t last = m->last;
353
354 deque<LogEntry>::iterator q = log_queue.begin();
355 while (q != log_queue.end()) {
356 const LogEntry &entry(*q);
357 if (entry.seq > last)
358 break;
359 ldout(cct,10) << " logged " << entry << dendl;
360 q = log_queue.erase(q);
361 }
362 return true;
363 }
364