1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "common/LogClient.h"
16 #include "include/str_map.h"
17 #include "messages/MLog.h"
18 #include "messages/MLogAck.h"
19 #include "msg/Messenger.h"
20 #include "mon/MonMap.h"
21 #include "common/Graylog.h"
23 #define dout_subsys ceph_subsys_monc
25 int parse_log_client_options(CephContext
*cct
,
26 map
<string
,string
> &log_to_monitors
,
27 map
<string
,string
> &log_to_syslog
,
28 map
<string
,string
> &log_channels
,
29 map
<string
,string
> &log_prios
,
30 map
<string
,string
> &log_to_graylog
,
31 map
<string
,string
> &log_to_graylog_host
,
32 map
<string
,string
> &log_to_graylog_port
,
38 int r
= get_conf_str_map_helper(cct
->_conf
->clog_to_monitors
, oss
,
39 &log_to_monitors
, CLOG_CONFIG_DEFAULT_KEY
);
41 lderr(cct
) << __func__
<< " error parsing 'clog_to_monitors'" << dendl
;
45 r
= get_conf_str_map_helper(cct
->_conf
->clog_to_syslog
, oss
,
46 &log_to_syslog
, CLOG_CONFIG_DEFAULT_KEY
);
48 lderr(cct
) << __func__
<< " error parsing 'clog_to_syslog'" << dendl
;
52 r
= get_conf_str_map_helper(cct
->_conf
->clog_to_syslog_facility
, oss
,
53 &log_channels
, CLOG_CONFIG_DEFAULT_KEY
);
55 lderr(cct
) << __func__
<< " error parsing 'clog_to_syslog_facility'" << dendl
;
59 r
= get_conf_str_map_helper(cct
->_conf
->clog_to_syslog_level
, oss
,
60 &log_prios
, CLOG_CONFIG_DEFAULT_KEY
);
62 lderr(cct
) << __func__
<< " error parsing 'clog_to_syslog_level'" << dendl
;
66 r
= get_conf_str_map_helper(cct
->_conf
->clog_to_graylog
, oss
,
67 &log_to_graylog
, CLOG_CONFIG_DEFAULT_KEY
);
69 lderr(cct
) << __func__
<< " error parsing 'clog_to_graylog'" << dendl
;
73 r
= get_conf_str_map_helper(cct
->_conf
->clog_to_graylog_host
, oss
,
74 &log_to_graylog_host
, CLOG_CONFIG_DEFAULT_KEY
);
76 lderr(cct
) << __func__
<< " error parsing 'clog_to_graylog_host'" << dendl
;
80 r
= get_conf_str_map_helper(cct
->_conf
->clog_to_graylog_port
, oss
,
81 &log_to_graylog_port
, CLOG_CONFIG_DEFAULT_KEY
);
83 lderr(cct
) << __func__
<< " error parsing 'clog_to_graylog_port'" << dendl
;
87 fsid
= cct
->_conf
.get_val
<uuid_d
>("fsid");
88 host
= cct
->_conf
->host
;
93 #define dout_prefix _prefix(_dout, this)
94 static ostream
& _prefix(std::ostream
*_dout
, LogClient
*logc
) {
95 return *_dout
<< "log_client ";
98 static ostream
& _prefix(std::ostream
*_dout
, LogChannel
*lc
) {
99 return *_dout
<< "log_channel(" << lc
->get_log_channel() << ") ";
102 LogChannel::LogChannel(CephContext
*cct
, LogClient
*lc
, const string
&channel
)
103 : cct(cct
), parent(lc
),
104 log_channel(channel
), log_to_syslog(false), log_to_monitors(false)
108 LogChannel::LogChannel(CephContext
*cct
, LogClient
*lc
,
109 const string
&channel
, const string
&facility
,
111 : cct(cct
), parent(lc
),
112 log_channel(channel
), log_prio(prio
), syslog_facility(facility
),
113 log_to_syslog(false), log_to_monitors(false)
117 LogClient::LogClient(CephContext
*cct
, Messenger
*m
, MonMap
*mm
,
118 enum logclient_flag_t flags
)
119 : cct(cct
), messenger(m
), monmap(mm
), is_mon(flags
& FLAG_MON
),
120 last_log_sent(0), last_log(0)
124 LogClientTemp::LogClientTemp(clog_type type_
, LogChannel
&parent_
)
125 : type(type_
), parent(parent_
)
129 LogClientTemp::LogClientTemp(const LogClientTemp
&rhs
)
130 : type(rhs
.type
), parent(rhs
.parent
)
132 // don't want to-- nor can we-- copy the ostringstream
135 LogClientTemp::~LogClientTemp()
137 if (ss
.peek() != EOF
)
138 parent
.do_log(type
, ss
);
141 void LogChannel::update_config(map
<string
,string
> &log_to_monitors
,
142 map
<string
,string
> &log_to_syslog
,
143 map
<string
,string
> &log_channels
,
144 map
<string
,string
> &log_prios
,
145 map
<string
,string
> &log_to_graylog
,
146 map
<string
,string
> &log_to_graylog_host
,
147 map
<string
,string
> &log_to_graylog_port
,
151 ldout(cct
, 20) << __func__
<< " log_to_monitors " << log_to_monitors
152 << " log_to_syslog " << log_to_syslog
153 << " log_channels " << log_channels
154 << " log_prios " << log_prios
156 bool to_monitors
= (get_str_map_key(log_to_monitors
, log_channel
,
157 &CLOG_CONFIG_DEFAULT_KEY
) == "true");
158 bool to_syslog
= (get_str_map_key(log_to_syslog
, log_channel
,
159 &CLOG_CONFIG_DEFAULT_KEY
) == "true");
160 string syslog_facility
= get_str_map_key(log_channels
, log_channel
,
161 &CLOG_CONFIG_DEFAULT_KEY
);
162 string prio
= get_str_map_key(log_prios
, log_channel
,
163 &CLOG_CONFIG_DEFAULT_KEY
);
164 bool to_graylog
= (get_str_map_key(log_to_graylog
, log_channel
,
165 &CLOG_CONFIG_DEFAULT_KEY
) == "true");
166 string graylog_host
= get_str_map_key(log_to_graylog_host
, log_channel
,
167 &CLOG_CONFIG_DEFAULT_KEY
);
168 string graylog_port_str
= get_str_map_key(log_to_graylog_port
, log_channel
,
169 &CLOG_CONFIG_DEFAULT_KEY
);
170 int graylog_port
= atoi(graylog_port_str
.c_str());
172 set_log_to_monitors(to_monitors
);
173 set_log_to_syslog(to_syslog
);
174 set_syslog_facility(syslog_facility
);
177 if (to_graylog
&& !graylog
) { /* should but isn't */
178 graylog
= std::make_shared
<ceph::logging::Graylog
>("clog");
179 } else if (!to_graylog
&& graylog
) { /* shouldn't but is */
183 if (to_graylog
&& graylog
) {
184 graylog
->set_fsid(fsid
);
185 graylog
->set_hostname(host
);
188 if (graylog
&& (!graylog_host
.empty()) && (graylog_port
!= 0)) {
189 graylog
->set_destination(graylog_host
, graylog_port
);
192 ldout(cct
, 10) << __func__
193 << " to_monitors: " << (to_monitors
? "true" : "false")
194 << " to_syslog: " << (to_syslog
? "true" : "false")
195 << " syslog_facility: " << syslog_facility
197 << " to_graylog: " << (to_graylog
? "true" : "false")
198 << " graylog_host: " << graylog_host
199 << " graylog_port: " << graylog_port
203 void LogChannel::do_log(clog_type prio
, std::stringstream
& ss
)
213 void LogChannel::do_log(clog_type prio
, const std::string
& s
)
215 std::lock_guard
l(channel_lock
);
216 if (CLOG_ERROR
== prio
) {
217 ldout(cct
,-1) << "log " << prio
<< " : " << s
<< dendl
;
219 ldout(cct
,0) << "log " << prio
<< " : " << s
<< dendl
;
222 e
.stamp
= ceph_clock_now();
223 // seq and who should be set for syslog/graylog/log_to_mon
224 e
.addrs
= parent
->get_myaddrs();
225 e
.name
= parent
->get_myname();
226 e
.rank
= parent
->get_myrank();
229 e
.channel
= get_log_channel();
232 if (log_to_monitors
) {
233 e
.seq
= parent
->queue(e
);
235 e
.seq
= parent
->get_next_seq();
239 if (do_log_to_syslog()) {
240 ldout(cct
,0) << __func__
<< " log to syslog" << dendl
;
241 e
.log_to_syslog(get_log_prio(), get_syslog_facility());
245 if (do_log_to_graylog()) {
246 ldout(cct
,0) << __func__
<< " log to graylog" << dendl
;
247 graylog
->log_log_entry(&e
);
251 Message
*LogClient::get_mon_log_message(bool flush
)
253 std::lock_guard
l(log_lock
);
255 if (log_queue
.empty())
258 last_log_sent
= log_queue
.front().seq
;
260 return _get_mon_log_message();
263 bool LogClient::are_pending()
265 std::lock_guard
l(log_lock
);
266 return last_log
> last_log_sent
;
269 Message
*LogClient::_get_mon_log_message()
271 ceph_assert(ceph_mutex_is_locked(log_lock
));
272 if (log_queue
.empty())
275 // only send entries that haven't been sent yet during this mon
276 // session! monclient needs to call reset_session() on mon session
277 // reset for this to work right.
279 if (last_log_sent
== last_log
)
282 // limit entries per message
283 unsigned num_unsent
= last_log
- last_log_sent
;
285 if (cct
->_conf
->mon_client_max_log_entries_per_message
> 0)
286 num_send
= std::min(num_unsent
, (unsigned)cct
->_conf
->mon_client_max_log_entries_per_message
);
288 num_send
= num_unsent
;
290 ldout(cct
,10) << " log_queue is " << log_queue
.size() << " last_log " << last_log
<< " sent " << last_log_sent
291 << " num " << log_queue
.size()
292 << " unsent " << num_unsent
293 << " sending " << num_send
<< dendl
;
294 ceph_assert(num_unsent
<= log_queue
.size());
295 std::deque
<LogEntry
>::iterator p
= log_queue
.begin();
296 std::deque
<LogEntry
> o
;
297 while (p
->seq
<= last_log_sent
) {
299 ceph_assert(p
!= log_queue
.end());
302 ceph_assert(p
!= log_queue
.end());
304 last_log_sent
= p
->seq
;
305 ldout(cct
,10) << " will send " << *p
<< dendl
;
309 MLog
*log
= new MLog(monmap
->get_fsid());
310 log
->entries
.swap(o
);
315 void LogClient::_send_to_mon()
317 ceph_assert(ceph_mutex_is_locked(log_lock
));
319 ceph_assert(messenger
->get_myname().is_mon());
320 ldout(cct
,10) << __func__
<< " log to self" << dendl
;
321 Message
*log
= _get_mon_log_message();
322 messenger
->get_loopback_connection()->send_message(log
);
325 version_t
LogClient::queue(LogEntry
&entry
)
327 std::lock_guard
l(log_lock
);
328 entry
.seq
= ++last_log
;
329 log_queue
.push_back(entry
);
338 uint64_t LogClient::get_next_seq()
340 std::lock_guard
l(log_lock
);
344 entity_addrvec_t
LogClient::get_myaddrs()
346 return messenger
->get_myaddrs();
349 entity_name_t
LogClient::get_myrank()
351 return messenger
->get_myname();
354 const EntityName
& LogClient::get_myname()
356 return cct
->_conf
->name
;
359 bool LogClient::handle_log_ack(MLogAck
*m
)
361 std::lock_guard
l(log_lock
);
362 ldout(cct
,10) << "handle_log_ack " << *m
<< dendl
;
364 version_t last
= m
->last
;
366 deque
<LogEntry
>::iterator q
= log_queue
.begin();
367 while (q
!= log_queue
.end()) {
368 const LogEntry
&entry(*q
);
369 if (entry
.seq
> last
)
371 ldout(cct
,10) << " logged " << entry
<< dendl
;
372 q
= log_queue
.erase(q
);