1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "common/LogClient.h"
16 #include "include/str_map.h"
17 #include "messages/MLog.h"
18 #include "messages/MLogAck.h"
19 #include "msg/Messenger.h"
20 #include "mon/MonMap.h"
21 #include "common/Graylog.h"
23 #define dout_subsys ceph_subsys_monc
27 using std::ostringstream
;
31 #define dout_prefix _prefix(_dout, this)
32 static ostream
& _prefix(std::ostream
*_dout
, LogClient
*logc
) {
33 return *_dout
<< "log_client ";
36 static ostream
& _prefix(std::ostream
*_dout
, LogChannel
*lc
) {
37 return *_dout
<< "log_channel(" << lc
->get_log_channel() << ") ";
40 LogChannel::LogChannel(CephContext
*cct
, LogClient
*lc
, const string
&channel
)
41 : cct(cct
), parent(lc
),
42 log_channel(channel
), log_to_syslog(false), log_to_monitors(false)
46 LogChannel::LogChannel(CephContext
*cct
, LogClient
*lc
,
47 const string
&channel
, const string
&facility
,
49 : cct(cct
), parent(lc
),
50 log_channel(channel
), log_prio(prio
), syslog_facility(facility
),
51 log_to_syslog(false), log_to_monitors(false)
55 LogClient::LogClient(CephContext
*cct
, Messenger
*m
, MonMap
*mm
,
56 enum logclient_flag_t flags
)
57 : cct(cct
), messenger(m
), monmap(mm
), is_mon(flags
& FLAG_MON
),
58 last_log_sent(0), last_log(0)
62 void LogChannel::set_log_to_monitors(bool v
)
64 if (log_to_monitors
!= v
) {
70 void LogChannel::update_config(const clog_targets_conf_t
& conf_strings
)
72 ldout(cct
, 20) << __func__
<< " log_to_monitors " << conf_strings
.log_to_monitors
73 << " log_to_syslog " << conf_strings
.log_to_syslog
74 << " log_channels " << conf_strings
.log_channels
75 << " log_prios " << conf_strings
.log_prios
78 bool to_monitors
= (conf_strings
.log_to_monitors
== "true");
79 bool to_syslog
= (conf_strings
.log_to_syslog
== "true");
80 bool to_graylog
= (conf_strings
.log_to_graylog
== "true");
81 auto graylog_port
= atoi(conf_strings
.log_to_graylog_port
.c_str());
83 set_log_to_monitors(to_monitors
);
84 set_log_to_syslog(to_syslog
);
85 set_syslog_facility(conf_strings
.log_channels
);
86 set_log_prio(conf_strings
.log_prios
);
88 if (to_graylog
&& !graylog
) { /* should but isn't */
89 graylog
= std::make_shared
<ceph::logging::Graylog
>("clog");
90 } else if (!to_graylog
&& graylog
) { /* shouldn't but is */
94 if (to_graylog
&& graylog
) {
95 graylog
->set_fsid(conf_strings
.fsid
);
96 graylog
->set_hostname(conf_strings
.host
);
99 if (graylog
&& !conf_strings
.log_to_graylog_host
.empty() && (graylog_port
!= 0)) {
100 graylog
->set_destination(conf_strings
.log_to_graylog_host
, graylog_port
);
103 ldout(cct
, 10) << __func__
104 << " to_monitors: " << (to_monitors
? "true" : "false")
105 << " to_syslog: " << (to_syslog
? "true" : "false")
106 << " syslog_facility: " << conf_strings
.log_channels
107 << " prio: " << conf_strings
.log_prios
108 << " to_graylog: " << (to_graylog
? "true" : "false")
109 << " graylog_host: " << conf_strings
.log_to_graylog_host
110 << " graylog_port: " << graylog_port
114 clog_targets_conf_t
LogChannel::parse_client_options(CephContext
* conf_cct
)
116 auto parsed_options
= parse_log_client_options(conf_cct
);
117 update_config(parsed_options
);
118 return parsed_options
;
121 clog_targets_conf_t
LogChannel::parse_log_client_options(CephContext
* cct
)
123 clog_targets_conf_t targets
;
125 targets
.log_to_monitors
=
126 get_value_via_strmap(cct
->_conf
.get_val
<string
>("clog_to_monitors"),
127 log_channel
, CLOG_CONFIG_DEFAULT_KEY
);
128 targets
.log_to_syslog
=
129 get_value_via_strmap(cct
->_conf
.get_val
<string
>("clog_to_syslog"),
130 log_channel
, CLOG_CONFIG_DEFAULT_KEY
);
131 targets
.log_channels
=
132 get_value_via_strmap(cct
->_conf
.get_val
<string
>("clog_to_syslog_facility"),
133 log_channel
, CLOG_CONFIG_DEFAULT_KEY
);
135 get_value_via_strmap(cct
->_conf
.get_val
<string
>("clog_to_syslog_level"),
136 log_channel
, CLOG_CONFIG_DEFAULT_KEY
);
137 targets
.log_to_graylog
=
138 get_value_via_strmap(cct
->_conf
.get_val
<string
>("clog_to_graylog"),
139 log_channel
, CLOG_CONFIG_DEFAULT_KEY
);
140 targets
.log_to_graylog_host
=
141 get_value_via_strmap(cct
->_conf
.get_val
<string
>("clog_to_graylog_host"),
142 log_channel
, CLOG_CONFIG_DEFAULT_KEY
);
143 targets
.log_to_graylog_port
=
144 get_value_via_strmap(cct
->_conf
.get_val
<string
>("clog_to_graylog_port"),
145 log_channel
, CLOG_CONFIG_DEFAULT_KEY
);
147 targets
.fsid
= cct
->_conf
.get_val
<uuid_d
>("fsid");
148 targets
.host
= cct
->_conf
->host
;
152 void LogChannel::do_log(clog_type prio
, std::stringstream
& ss
)
162 void LogChannel::do_log(clog_type prio
, const std::string
& s
)
164 std::lock_guard
l(channel_lock
);
165 if (CLOG_ERROR
== prio
) {
166 ldout(cct
,-1) << "log " << prio
<< " : " << s
<< dendl
;
168 ldout(cct
,0) << "log " << prio
<< " : " << s
<< dendl
;
171 e
.stamp
= ceph_clock_now();
172 // seq and who should be set for syslog/graylog/log_to_mon
173 e
.addrs
= parent
->get_myaddrs();
174 e
.name
= parent
->get_myname();
175 e
.rank
= parent
->get_myrank();
178 e
.channel
= get_log_channel();
181 if (log_to_monitors
) {
182 e
.seq
= parent
->queue(e
);
184 e
.seq
= parent
->get_next_seq();
188 if (do_log_to_syslog()) {
189 ldout(cct
,0) << __func__
<< " log to syslog" << dendl
;
190 e
.log_to_syslog(get_log_prio(), get_syslog_facility());
194 if (do_log_to_graylog()) {
195 ldout(cct
,0) << __func__
<< " log to graylog" << dendl
;
196 graylog
->log_log_entry(&e
);
200 ceph::ref_t
<Message
> LogClient::get_mon_log_message(bool flush
)
202 std::lock_guard
l(log_lock
);
204 if (log_queue
.empty())
207 last_log_sent
= log_queue
.front().seq
;
209 return _get_mon_log_message();
212 bool LogClient::are_pending()
214 std::lock_guard
l(log_lock
);
215 return last_log
> last_log_sent
;
218 ceph::ref_t
<Message
> LogClient::_get_mon_log_message()
220 ceph_assert(ceph_mutex_is_locked(log_lock
));
221 if (log_queue
.empty())
224 // only send entries that haven't been sent yet during this mon
225 // session! monclient needs to call reset_session() on mon session
226 // reset for this to work right.
228 if (last_log_sent
== last_log
)
231 // limit entries per message
232 unsigned num_unsent
= last_log
- last_log_sent
;
234 if (cct
->_conf
->mon_client_max_log_entries_per_message
> 0)
235 num_send
= std::min(num_unsent
, (unsigned)cct
->_conf
->mon_client_max_log_entries_per_message
);
237 num_send
= num_unsent
;
239 ldout(cct
,10) << " log_queue is " << log_queue
.size() << " last_log " << last_log
<< " sent " << last_log_sent
240 << " num " << log_queue
.size()
241 << " unsent " << num_unsent
242 << " sending " << num_send
<< dendl
;
243 ceph_assert(num_unsent
<= log_queue
.size());
244 std::deque
<LogEntry
>::iterator p
= log_queue
.begin();
245 std::deque
<LogEntry
> o
;
246 while (p
->seq
<= last_log_sent
) {
248 ceph_assert(p
!= log_queue
.end());
251 ceph_assert(p
!= log_queue
.end());
253 last_log_sent
= p
->seq
;
254 ldout(cct
,10) << " will send " << *p
<< dendl
;
258 return ceph::make_message
<MLog
>(monmap
->get_fsid(),
262 void LogClient::_send_to_mon()
264 ceph_assert(ceph_mutex_is_locked(log_lock
));
266 ceph_assert(messenger
->get_myname().is_mon());
267 ldout(cct
,10) << __func__
<< " log to self" << dendl
;
268 auto log
= _get_mon_log_message();
269 messenger
->get_loopback_connection()->send_message2(std::move(log
));
272 version_t
LogClient::queue(LogEntry
&entry
)
274 std::lock_guard
l(log_lock
);
275 entry
.seq
= ++last_log
;
276 log_queue
.push_back(entry
);
285 void LogClient::reset()
287 std::lock_guard
l(log_lock
);
288 if (log_queue
.size()) {
291 last_log_sent
= last_log
;
294 uint64_t LogClient::get_next_seq()
296 std::lock_guard
l(log_lock
);
300 entity_addrvec_t
LogClient::get_myaddrs()
302 return messenger
->get_myaddrs();
305 entity_name_t
LogClient::get_myrank()
307 return messenger
->get_myname();
310 const EntityName
& LogClient::get_myname()
312 return cct
->_conf
->name
;
315 bool LogClient::handle_log_ack(MLogAck
*m
)
317 std::lock_guard
l(log_lock
);
318 ldout(cct
,10) << "handle_log_ack " << *m
<< dendl
;
320 version_t last
= m
->last
;
322 auto q
= log_queue
.begin();
323 while (q
!= log_queue
.end()) {
324 const LogEntry
&entry(*q
);
325 if (entry
.seq
> last
)
327 ldout(cct
,10) << " logged " << entry
<< dendl
;
328 q
= log_queue
.erase(q
);