]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/LogClient.cc
update sources to v12.2.0
[ceph.git] / ceph / src / common / LogClient.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
31f18b77 15#include "common/LogClient.h"
7c673cae 16#include "include/str_map.h"
7c673cae
FG
17#include "messages/MLog.h"
18#include "messages/MLogAck.h"
19#include "mon/MonMap.h"
31f18b77 20#include "common/Graylog.h"
7c673cae
FG
21
22#ifdef DARWIN
23#include <sys/param.h>
24#include <sys/mount.h>
25#endif // DARWIN
26
7c673cae
FG
27#define dout_subsys ceph_subsys_monc
28
29int parse_log_client_options(CephContext *cct,
30 map<string,string> &log_to_monitors,
31 map<string,string> &log_to_syslog,
32 map<string,string> &log_channels,
33 map<string,string> &log_prios,
34 map<string,string> &log_to_graylog,
35 map<string,string> &log_to_graylog_host,
36 map<string,string> &log_to_graylog_port,
37 uuid_d &fsid,
38 string &host)
39{
40 ostringstream oss;
41
42 int r = get_conf_str_map_helper(cct->_conf->clog_to_monitors, oss,
43 &log_to_monitors, CLOG_CONFIG_DEFAULT_KEY);
44 if (r < 0) {
45 lderr(cct) << __func__ << " error parsing 'clog_to_monitors'" << dendl;
46 return r;
47 }
48
49 r = get_conf_str_map_helper(cct->_conf->clog_to_syslog, oss,
50 &log_to_syslog, CLOG_CONFIG_DEFAULT_KEY);
51 if (r < 0) {
52 lderr(cct) << __func__ << " error parsing 'clog_to_syslog'" << dendl;
53 return r;
54 }
55
56 r = get_conf_str_map_helper(cct->_conf->clog_to_syslog_facility, oss,
57 &log_channels, CLOG_CONFIG_DEFAULT_KEY);
58 if (r < 0) {
59 lderr(cct) << __func__ << " error parsing 'clog_to_syslog_facility'" << dendl;
60 return r;
61 }
62
63 r = get_conf_str_map_helper(cct->_conf->clog_to_syslog_level, oss,
64 &log_prios, CLOG_CONFIG_DEFAULT_KEY);
65 if (r < 0) {
66 lderr(cct) << __func__ << " error parsing 'clog_to_syslog_level'" << dendl;
67 return r;
68 }
69
70 r = get_conf_str_map_helper(cct->_conf->clog_to_graylog, oss,
71 &log_to_graylog, CLOG_CONFIG_DEFAULT_KEY);
72 if (r < 0) {
73 lderr(cct) << __func__ << " error parsing 'clog_to_graylog'" << dendl;
74 return r;
75 }
76
77 r = get_conf_str_map_helper(cct->_conf->clog_to_graylog_host, oss,
78 &log_to_graylog_host, CLOG_CONFIG_DEFAULT_KEY);
79 if (r < 0) {
80 lderr(cct) << __func__ << " error parsing 'clog_to_graylog_host'" << dendl;
81 return r;
82 }
83
84 r = get_conf_str_map_helper(cct->_conf->clog_to_graylog_port, oss,
85 &log_to_graylog_port, CLOG_CONFIG_DEFAULT_KEY);
86 if (r < 0) {
87 lderr(cct) << __func__ << " error parsing 'clog_to_graylog_port'" << dendl;
88 return r;
89 }
90
91 fsid = cct->_conf->fsid;
92 host = cct->_conf->host;
93 return 0;
94}
95
96#undef dout_prefix
97#define dout_prefix _prefix(_dout, this)
98static ostream& _prefix(std::ostream *_dout, LogClient *logc) {
99 return *_dout << "log_client ";
100}
101
102static ostream& _prefix(std::ostream *_dout, LogChannel *lc) {
103 return *_dout << "log_channel(" << lc->get_log_channel() << ") ";
104}
105
106LogChannel::LogChannel(CephContext *cct, LogClient *lc, const string &channel)
107 : cct(cct), parent(lc), channel_lock("LogChannel::channel_lock"),
108 log_channel(channel), log_to_syslog(false), log_to_monitors(false)
109{
110}
111
112LogChannel::LogChannel(CephContext *cct, LogClient *lc,
113 const string &channel, const string &facility,
114 const string &prio)
115 : cct(cct), parent(lc), channel_lock("LogChannel::channel_lock"),
116 log_channel(channel), log_prio(prio), syslog_facility(facility),
117 log_to_syslog(false), log_to_monitors(false)
118{
119}
120
121LogClient::LogClient(CephContext *cct, Messenger *m, MonMap *mm,
122 enum logclient_flag_t flags)
123 : cct(cct), messenger(m), monmap(mm), is_mon(flags & FLAG_MON),
124 log_lock("LogClient::log_lock"), last_log_sent(0), last_log(0)
125{
126}
127
128LogClientTemp::LogClientTemp(clog_type type_, LogChannel &parent_)
129 : type(type_), parent(parent_)
130{
131}
132
133LogClientTemp::LogClientTemp(const LogClientTemp &rhs)
134 : type(rhs.type), parent(rhs.parent)
135{
136 // don't want to-- nor can we-- copy the ostringstream
137}
138
139LogClientTemp::~LogClientTemp()
140{
141 if (ss.peek() != EOF)
142 parent.do_log(type, ss);
143}
144
145void LogChannel::update_config(map<string,string> &log_to_monitors,
146 map<string,string> &log_to_syslog,
147 map<string,string> &log_channels,
148 map<string,string> &log_prios,
149 map<string,string> &log_to_graylog,
150 map<string,string> &log_to_graylog_host,
151 map<string,string> &log_to_graylog_port,
152 uuid_d &fsid,
153 string &host)
154{
155 ldout(cct, 20) << __func__ << " log_to_monitors " << log_to_monitors
156 << " log_to_syslog " << log_to_syslog
157 << " log_channels " << log_channels
158 << " log_prios " << log_prios
159 << dendl;
160 bool to_monitors = (get_str_map_key(log_to_monitors, log_channel,
161 &CLOG_CONFIG_DEFAULT_KEY) == "true");
162 bool to_syslog = (get_str_map_key(log_to_syslog, log_channel,
163 &CLOG_CONFIG_DEFAULT_KEY) == "true");
164 string syslog_facility = get_str_map_key(log_channels, log_channel,
165 &CLOG_CONFIG_DEFAULT_KEY);
166 string prio = get_str_map_key(log_prios, log_channel,
167 &CLOG_CONFIG_DEFAULT_KEY);
168 bool to_graylog = (get_str_map_key(log_to_graylog, log_channel,
169 &CLOG_CONFIG_DEFAULT_KEY) == "true");
170 string graylog_host = get_str_map_key(log_to_graylog_host, log_channel,
171 &CLOG_CONFIG_DEFAULT_KEY);
172 string graylog_port_str = get_str_map_key(log_to_graylog_port, log_channel,
173 &CLOG_CONFIG_DEFAULT_KEY);
174 int graylog_port = atoi(graylog_port_str.c_str());
175
176 set_log_to_monitors(to_monitors);
177 set_log_to_syslog(to_syslog);
178 set_syslog_facility(syslog_facility);
179 set_log_prio(prio);
180
181 if (to_graylog && !graylog) { /* should but isn't */
182 graylog = std::make_shared<ceph::logging::Graylog>("clog");
183 } else if (!to_graylog && graylog) { /* shouldn't but is */
184 graylog.reset();
185 }
186
187 if (to_graylog && graylog) {
188 graylog->set_fsid(fsid);
189 graylog->set_hostname(host);
190 }
191
192 if (graylog && (!graylog_host.empty()) && (graylog_port != 0)) {
193 graylog->set_destination(graylog_host, graylog_port);
194 }
195
196 ldout(cct, 10) << __func__
197 << " to_monitors: " << (to_monitors ? "true" : "false")
198 << " to_syslog: " << (to_syslog ? "true" : "false")
199 << " syslog_facility: " << syslog_facility
200 << " prio: " << prio
201 << " to_graylog: " << (to_graylog ? "true" : "false")
202 << " graylog_host: " << graylog_host
203 << " graylog_port: " << graylog_port
204 << ")" << dendl;
205}
206
207void LogChannel::do_log(clog_type prio, std::stringstream& ss)
208{
209 while (!ss.eof()) {
210 string s;
211 getline(ss, s);
212 if (!s.empty())
213 do_log(prio, s);
214 }
215}
216
217void LogChannel::do_log(clog_type prio, const std::string& s)
218{
219 Mutex::Locker l(channel_lock);
220 int lvl = (prio == CLOG_ERROR ? -1 : 0);
221 ldout(cct,lvl) << "log " << prio << " : " << s << dendl;
222 LogEntry e;
223 e.stamp = ceph_clock_now();
224 // seq and who should be set for syslog/graylog/log_to_mon
225 e.who = parent->get_myinst();
31f18b77 226 e.name = parent->get_myname();
7c673cae
FG
227 e.prio = prio;
228 e.msg = s;
229 e.channel = get_log_channel();
230
b5b8bbf5
FG
231 // log to monitor?
232 if (log_to_monitors) {
233 e.seq = parent->queue(e);
234 } else {
235 e.seq = parent->get_next_seq();
236 }
237
7c673cae
FG
238 // log to syslog?
239 if (do_log_to_syslog()) {
240 ldout(cct,0) << __func__ << " log to syslog" << dendl;
241 e.log_to_syslog(get_log_prio(), get_syslog_facility());
242 }
243
244 // log to graylog?
245 if (do_log_to_graylog()) {
246 ldout(cct,0) << __func__ << " log to graylog" << dendl;
247 graylog->log_log_entry(&e);
248 }
7c673cae
FG
249}
250
251Message *LogClient::get_mon_log_message(bool flush)
252{
253 Mutex::Locker l(log_lock);
254 if (flush) {
255 if (log_queue.empty())
256 return nullptr;
257 // reset session
258 last_log_sent = log_queue.front().seq;
259 }
260 return _get_mon_log_message();
261}
262
263bool LogClient::are_pending()
264{
265 Mutex::Locker l(log_lock);
266 return last_log > last_log_sent;
267}
268
269Message *LogClient::_get_mon_log_message()
270{
271 assert(log_lock.is_locked());
b5b8bbf5
FG
272 if (log_queue.empty())
273 return NULL;
7c673cae
FG
274
275 // only send entries that haven't been sent yet during this mon
276 // session! monclient needs to call reset_session() on mon session
277 // reset for this to work right.
278
279 if (last_log_sent == last_log)
280 return NULL;
281
282 // limit entries per message
283 unsigned num_unsent = last_log - last_log_sent;
284 unsigned num_send;
285 if (cct->_conf->mon_client_max_log_entries_per_message > 0)
286 num_send = MIN(num_unsent, (unsigned)cct->_conf->mon_client_max_log_entries_per_message);
287 else
288 num_send = num_unsent;
289
290 ldout(cct,10) << " log_queue is " << log_queue.size() << " last_log " << last_log << " sent " << last_log_sent
291 << " num " << log_queue.size()
292 << " unsent " << num_unsent
293 << " sending " << num_send << dendl;
294 assert(num_unsent <= log_queue.size());
295 std::deque<LogEntry>::iterator p = log_queue.begin();
296 std::deque<LogEntry> o;
297 while (p->seq <= last_log_sent) {
298 ++p;
299 assert(p != log_queue.end());
300 }
301 while (num_send--) {
302 assert(p != log_queue.end());
303 o.push_back(*p);
304 last_log_sent = p->seq;
305 ldout(cct,10) << " will send " << *p << dendl;
306 ++p;
307 }
308
309 MLog *log = new MLog(monmap->get_fsid());
310 log->entries.swap(o);
311
312 return log;
313}
314
315void LogClient::_send_to_mon()
316{
317 assert(log_lock.is_locked());
318 assert(is_mon);
319 assert(messenger->get_myname().is_mon());
320 ldout(cct,10) << __func__ << "log to self" << dendl;
321 Message *log = _get_mon_log_message();
322 messenger->get_loopback_connection()->send_message(log);
323}
324
325version_t LogClient::queue(LogEntry &entry)
326{
327 Mutex::Locker l(log_lock);
b5b8bbf5 328 entry.seq = ++last_log;
7c673cae
FG
329 log_queue.push_back(entry);
330
331 if (is_mon) {
332 _send_to_mon();
333 }
334
335 return entry.seq;
336}
337
338uint64_t LogClient::get_next_seq()
339{
b5b8bbf5 340 Mutex::Locker l(log_lock);
7c673cae
FG
341 return ++last_log;
342}
343
344const entity_inst_t& LogClient::get_myinst()
345{
346 return messenger->get_myinst();
347}
348
31f18b77
FG
349const EntityName& LogClient::get_myname()
350{
351 return cct->_conf->name;
352}
353
7c673cae
FG
354bool LogClient::handle_log_ack(MLogAck *m)
355{
356 Mutex::Locker l(log_lock);
357 ldout(cct,10) << "handle_log_ack " << *m << dendl;
358
359 version_t last = m->last;
360
361 deque<LogEntry>::iterator q = log_queue.begin();
362 while (q != log_queue.end()) {
363 const LogEntry &entry(*q);
364 if (entry.seq > last)
365 break;
366 ldout(cct,10) << " logged " << entry << dendl;
367 q = log_queue.erase(q);
368 }
369 return true;
370}
371