1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "common/LogClient.h"
16 #include "include/str_map.h"
17 #include "messages/MLog.h"
18 #include "messages/MLogAck.h"
19 #include "msg/Messenger.h"
20 #include "mon/MonMap.h"
21 #include "common/Graylog.h"
23 #define dout_subsys ceph_subsys_monc
25 int parse_log_client_options(CephContext
*cct
,
26 map
<string
,string
> &log_to_monitors
,
27 map
<string
,string
> &log_to_syslog
,
28 map
<string
,string
> &log_channels
,
29 map
<string
,string
> &log_prios
,
30 map
<string
,string
> &log_to_graylog
,
31 map
<string
,string
> &log_to_graylog_host
,
32 map
<string
,string
> &log_to_graylog_port
,
38 int r
= get_conf_str_map_helper(
39 cct
->_conf
.get_val
<string
>("clog_to_monitors"), oss
,
40 &log_to_monitors
, CLOG_CONFIG_DEFAULT_KEY
);
42 lderr(cct
) << __func__
<< " error parsing 'clog_to_monitors'" << dendl
;
46 r
= get_conf_str_map_helper(
47 cct
->_conf
.get_val
<string
>("clog_to_syslog"), oss
,
48 &log_to_syslog
, CLOG_CONFIG_DEFAULT_KEY
);
50 lderr(cct
) << __func__
<< " error parsing 'clog_to_syslog'" << dendl
;
54 r
= get_conf_str_map_helper(
55 cct
->_conf
.get_val
<string
>("clog_to_syslog_facility"), oss
,
56 &log_channels
, CLOG_CONFIG_DEFAULT_KEY
);
58 lderr(cct
) << __func__
<< " error parsing 'clog_to_syslog_facility'" << dendl
;
62 r
= get_conf_str_map_helper(
63 cct
->_conf
.get_val
<string
>("clog_to_syslog_level"), oss
,
64 &log_prios
, CLOG_CONFIG_DEFAULT_KEY
);
66 lderr(cct
) << __func__
<< " error parsing 'clog_to_syslog_level'" << dendl
;
70 r
= get_conf_str_map_helper(
71 cct
->_conf
.get_val
<string
>("clog_to_graylog"), oss
,
72 &log_to_graylog
, CLOG_CONFIG_DEFAULT_KEY
);
74 lderr(cct
) << __func__
<< " error parsing 'clog_to_graylog'" << dendl
;
78 r
= get_conf_str_map_helper(
79 cct
->_conf
.get_val
<string
>("clog_to_graylog_host"), oss
,
80 &log_to_graylog_host
, CLOG_CONFIG_DEFAULT_KEY
);
82 lderr(cct
) << __func__
<< " error parsing 'clog_to_graylog_host'" << dendl
;
86 r
= get_conf_str_map_helper(
87 cct
->_conf
.get_val
<string
>("clog_to_graylog_port"), oss
,
88 &log_to_graylog_port
, CLOG_CONFIG_DEFAULT_KEY
);
90 lderr(cct
) << __func__
<< " error parsing 'clog_to_graylog_port'" << dendl
;
94 fsid
= cct
->_conf
.get_val
<uuid_d
>("fsid");
95 host
= cct
->_conf
->host
;
100 #define dout_prefix _prefix(_dout, this)
101 static ostream
& _prefix(std::ostream
*_dout
, LogClient
*logc
) {
102 return *_dout
<< "log_client ";
105 static ostream
& _prefix(std::ostream
*_dout
, LogChannel
*lc
) {
106 return *_dout
<< "log_channel(" << lc
->get_log_channel() << ") ";
109 LogChannel::LogChannel(CephContext
*cct
, LogClient
*lc
, const string
&channel
)
110 : cct(cct
), parent(lc
),
111 log_channel(channel
), log_to_syslog(false), log_to_monitors(false)
115 LogChannel::LogChannel(CephContext
*cct
, LogClient
*lc
,
116 const string
&channel
, const string
&facility
,
118 : cct(cct
), parent(lc
),
119 log_channel(channel
), log_prio(prio
), syslog_facility(facility
),
120 log_to_syslog(false), log_to_monitors(false)
124 LogClient::LogClient(CephContext
*cct
, Messenger
*m
, MonMap
*mm
,
125 enum logclient_flag_t flags
)
126 : cct(cct
), messenger(m
), monmap(mm
), is_mon(flags
& FLAG_MON
),
127 last_log_sent(0), last_log(0)
131 void LogChannel::update_config(map
<string
,string
> &log_to_monitors
,
132 map
<string
,string
> &log_to_syslog
,
133 map
<string
,string
> &log_channels
,
134 map
<string
,string
> &log_prios
,
135 map
<string
,string
> &log_to_graylog
,
136 map
<string
,string
> &log_to_graylog_host
,
137 map
<string
,string
> &log_to_graylog_port
,
141 ldout(cct
, 20) << __func__
<< " log_to_monitors " << log_to_monitors
142 << " log_to_syslog " << log_to_syslog
143 << " log_channels " << log_channels
144 << " log_prios " << log_prios
146 bool to_monitors
= (get_str_map_key(log_to_monitors
, log_channel
,
147 &CLOG_CONFIG_DEFAULT_KEY
) == "true");
148 bool to_syslog
= (get_str_map_key(log_to_syslog
, log_channel
,
149 &CLOG_CONFIG_DEFAULT_KEY
) == "true");
150 string syslog_facility
= get_str_map_key(log_channels
, log_channel
,
151 &CLOG_CONFIG_DEFAULT_KEY
);
152 string prio
= get_str_map_key(log_prios
, log_channel
,
153 &CLOG_CONFIG_DEFAULT_KEY
);
154 bool to_graylog
= (get_str_map_key(log_to_graylog
, log_channel
,
155 &CLOG_CONFIG_DEFAULT_KEY
) == "true");
156 string graylog_host
= get_str_map_key(log_to_graylog_host
, log_channel
,
157 &CLOG_CONFIG_DEFAULT_KEY
);
158 string graylog_port_str
= get_str_map_key(log_to_graylog_port
, log_channel
,
159 &CLOG_CONFIG_DEFAULT_KEY
);
160 int graylog_port
= atoi(graylog_port_str
.c_str());
162 set_log_to_monitors(to_monitors
);
163 set_log_to_syslog(to_syslog
);
164 set_syslog_facility(syslog_facility
);
167 if (to_graylog
&& !graylog
) { /* should but isn't */
168 graylog
= std::make_shared
<ceph::logging::Graylog
>("clog");
169 } else if (!to_graylog
&& graylog
) { /* shouldn't but is */
173 if (to_graylog
&& graylog
) {
174 graylog
->set_fsid(fsid
);
175 graylog
->set_hostname(host
);
178 if (graylog
&& (!graylog_host
.empty()) && (graylog_port
!= 0)) {
179 graylog
->set_destination(graylog_host
, graylog_port
);
182 ldout(cct
, 10) << __func__
183 << " to_monitors: " << (to_monitors
? "true" : "false")
184 << " to_syslog: " << (to_syslog
? "true" : "false")
185 << " syslog_facility: " << syslog_facility
187 << " to_graylog: " << (to_graylog
? "true" : "false")
188 << " graylog_host: " << graylog_host
189 << " graylog_port: " << graylog_port
193 void LogChannel::do_log(clog_type prio
, std::stringstream
& ss
)
203 void LogChannel::do_log(clog_type prio
, const std::string
& s
)
205 std::lock_guard
l(channel_lock
);
206 if (CLOG_ERROR
== prio
) {
207 ldout(cct
,-1) << "log " << prio
<< " : " << s
<< dendl
;
209 ldout(cct
,0) << "log " << prio
<< " : " << s
<< dendl
;
212 e
.stamp
= ceph_clock_now();
213 // seq and who should be set for syslog/graylog/log_to_mon
214 e
.addrs
= parent
->get_myaddrs();
215 e
.name
= parent
->get_myname();
216 e
.rank
= parent
->get_myrank();
219 e
.channel
= get_log_channel();
222 if (log_to_monitors
) {
223 e
.seq
= parent
->queue(e
);
225 e
.seq
= parent
->get_next_seq();
229 if (do_log_to_syslog()) {
230 ldout(cct
,0) << __func__
<< " log to syslog" << dendl
;
231 e
.log_to_syslog(get_log_prio(), get_syslog_facility());
235 if (do_log_to_graylog()) {
236 ldout(cct
,0) << __func__
<< " log to graylog" << dendl
;
237 graylog
->log_log_entry(&e
);
241 ceph::ref_t
<Message
> LogClient::get_mon_log_message(bool flush
)
243 std::lock_guard
l(log_lock
);
245 if (log_queue
.empty())
248 last_log_sent
= log_queue
.front().seq
;
250 return _get_mon_log_message();
253 bool LogClient::are_pending()
255 std::lock_guard
l(log_lock
);
256 return last_log
> last_log_sent
;
259 ceph::ref_t
<Message
> LogClient::_get_mon_log_message()
261 ceph_assert(ceph_mutex_is_locked(log_lock
));
262 if (log_queue
.empty())
265 // only send entries that haven't been sent yet during this mon
266 // session! monclient needs to call reset_session() on mon session
267 // reset for this to work right.
269 if (last_log_sent
== last_log
)
272 // limit entries per message
273 unsigned num_unsent
= last_log
- last_log_sent
;
275 if (cct
->_conf
->mon_client_max_log_entries_per_message
> 0)
276 num_send
= std::min(num_unsent
, (unsigned)cct
->_conf
->mon_client_max_log_entries_per_message
);
278 num_send
= num_unsent
;
280 ldout(cct
,10) << " log_queue is " << log_queue
.size() << " last_log " << last_log
<< " sent " << last_log_sent
281 << " num " << log_queue
.size()
282 << " unsent " << num_unsent
283 << " sending " << num_send
<< dendl
;
284 ceph_assert(num_unsent
<= log_queue
.size());
285 std::deque
<LogEntry
>::iterator p
= log_queue
.begin();
286 std::deque
<LogEntry
> o
;
287 while (p
->seq
<= last_log_sent
) {
289 ceph_assert(p
!= log_queue
.end());
292 ceph_assert(p
!= log_queue
.end());
294 last_log_sent
= p
->seq
;
295 ldout(cct
,10) << " will send " << *p
<< dendl
;
299 return ceph::make_message
<MLog
>(monmap
->get_fsid(),
303 void LogClient::_send_to_mon()
305 ceph_assert(ceph_mutex_is_locked(log_lock
));
307 ceph_assert(messenger
->get_myname().is_mon());
308 ldout(cct
,10) << __func__
<< " log to self" << dendl
;
309 auto log
= _get_mon_log_message();
310 messenger
->get_loopback_connection()->send_message2(std::move(log
));
313 version_t
LogClient::queue(LogEntry
&entry
)
315 std::lock_guard
l(log_lock
);
316 entry
.seq
= ++last_log
;
317 log_queue
.push_back(entry
);
326 uint64_t LogClient::get_next_seq()
328 std::lock_guard
l(log_lock
);
332 entity_addrvec_t
LogClient::get_myaddrs()
334 return messenger
->get_myaddrs();
337 entity_name_t
LogClient::get_myrank()
339 return messenger
->get_myname();
342 const EntityName
& LogClient::get_myname()
344 return cct
->_conf
->name
;
347 bool LogClient::handle_log_ack(MLogAck
*m
)
349 std::lock_guard
l(log_lock
);
350 ldout(cct
,10) << "handle_log_ack " << *m
<< dendl
;
352 version_t last
= m
->last
;
354 deque
<LogEntry
>::iterator q
= log_queue
.begin();
355 while (q
!= log_queue
.end()) {
356 const LogEntry
&entry(*q
);
357 if (entry
.seq
> last
)
359 ldout(cct
,10) << " logged " << entry
<< dendl
;
360 q
= log_queue
.erase(q
);