#include "common/Graylog.h"
#include "common/errno.h"
#include "common/strtol.h"
-#include "include/assert.h"
+#include "include/ceph_assert.h"
#include "include/str_list.h"
#include "include/str_map.h"
#include "include/compat.h"
{
dout(10) << "create_initial -- creating initial map" << dendl;
LogEntry e;
- e.name = g_conf->name;
+ e.name = g_conf()->name;
+ e.rank = entity_name_t::MON(mon->rank);
+ e.addrs = mon->messenger->get_myaddrs();
e.stamp = ceph_clock_now();
e.prio = CLOG_INFO;
std::stringstream ss;
<< " summary v " << summary.version << dendl;
if (version == summary.version)
return;
- assert(version >= summary.version);
+ ceph_assert(version >= summary.version);
map<string,bufferlist> channel_blog;
if ((latest_full > 0) && (latest_full > summary.version)) {
bufferlist latest_bl;
get_version_full(latest_full, latest_bl);
- assert(latest_bl.length() != 0);
+ ceph_assert(latest_bl.length() != 0);
dout(7) << __func__ << " loading summary e" << latest_full << dendl;
- bufferlist::iterator p = latest_bl.begin();
- ::decode(summary, p);
+ auto p = latest_bl.cbegin();
+ decode(summary, p);
dout(7) << __func__ << " loaded summary e" << summary.version << dendl;
}
while (version > summary.version) {
bufferlist bl;
int err = get_version(summary.version+1, bl);
- assert(err == 0);
- assert(bl.length());
+ ceph_assert(err == 0);
+ ceph_assert(bl.length());
- bufferlist::iterator p = bl.begin();
+ auto p = bl.cbegin();
__u8 v;
- ::decode(v, p);
+ decode(v, p);
while (!p.end()) {
LogEntry le;
le.decode(p);
if (channel.empty()) // keep retrocompatibility
channel = CLOG_CHANNEL_CLUSTER;
- if (g_conf->get_val<bool>("mon_cluster_log_to_stderr")) {
+ if (g_conf().get_val<bool>("mon_cluster_log_to_stderr")) {
cerr << channel << " " << le << std::endl;
}
<< " host:" << channels.log_to_graylog_host << dendl;
}
- string log_file = channels.get_log_file(channel);
- dout(20) << __func__ << " logging for channel '" << channel
- << "' to file '" << log_file << "'" << dendl;
-
- if (!log_file.empty()) {
- string log_file_level = channels.get_log_file_level(channel);
- if (log_file_level.empty()) {
- dout(1) << __func__ << " warning: log file level not defined for"
- << " channel '" << channel << "' yet a log file is --"
- << " will assume lowest level possible" << dendl;
- }
-
- int min = string_to_syslog_level(log_file_level);
- int l = clog_type_to_syslog_level(le.prio);
- if (l <= min) {
- stringstream ss;
- ss << le << "\n";
- // init entry if DNE
- bufferlist &blog = channel_blog[channel];
- blog.append(ss.str());
+ if (g_conf()->mon_cluster_log_to_file) {
+ string log_file = channels.get_log_file(channel);
+ dout(20) << __func__ << " logging for channel '" << channel
+ << "' to file '" << log_file << "'" << dendl;
+
+ if (!log_file.empty()) {
+ string log_file_level = channels.get_log_file_level(channel);
+ if (log_file_level.empty()) {
+ dout(1) << __func__ << " warning: log file level not defined for"
+ << " channel '" << channel << "' yet a log file is --"
+ << " will assume lowest level possible" << dendl;
+ }
+
+ int min = string_to_syslog_level(log_file_level);
+ int l = clog_type_to_syslog_level(le.prio);
+ if (l <= min) {
+ stringstream ss;
+ ss << le << "\n";
+ // init entry if DNE
+ bufferlist &blog = channel_blog[channel];
+ blog.append(ss.str());
+ }
}
}
}
summary.version++;
- summary.prune(g_conf->mon_log_max_summary);
+ summary.prune(g_conf()->mon_log_max_summary);
}
dout(15) << __func__ << " logging for "
bufferlist bl;
dout(10) << __func__ << " v" << version << dendl;
__u8 v = 1;
- ::encode(v, bl);
+ encode(v, bl);
multimap<utime_t,LogEntry>::iterator p;
for (p = pending_log.begin(); p != pending_log.end(); ++p)
p->second.encode(bl, mon->get_quorum_con_features());
void LogMonitor::encode_full(MonitorDBStore::TransactionRef t)
{
dout(10) << __func__ << " log v " << summary.version << dendl;
- assert(get_last_committed() == summary.version);
+ ceph_assert(get_last_committed() == summary.version);
bufferlist summary_bl;
- ::encode(summary, summary_bl, mon->get_quorum_con_features());
+ encode(summary, summary_bl, mon->get_quorum_con_features());
put_version_full(t, summary.version, summary_bl);
put_version_latest_full(t, summary.version);
}
-version_t LogMonitor::get_trim_to()
+version_t LogMonitor::get_trim_to() const
{
if (!mon->is_leader())
return 0;
- unsigned max = g_conf->mon_max_log_epochs;
+ unsigned max = g_conf()->mon_max_log_epochs;
version_t version = get_last_committed();
if (version > max)
return version - max;
case MSG_MON_COMMAND:
try {
return preprocess_command(op);
- }
- catch (const bad_cmd_get& e) {
+ } catch (const bad_cmd_get& e) {
bufferlist bl;
mon->reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
return true;
case MSG_MON_COMMAND:
try {
return prepare_command(op);
- }
- catch (const bad_cmd_get& e) {
+ } catch (const bad_cmd_get& e) {
bufferlist bl;
mon->reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
return true;
dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl;
int num_new = 0;
- MonSession *session = m->get_session();
+ MonSession *session = op->get_session();
if (!session)
goto done;
if (!session->is_capable("log", MON_CAP_W)) {
pending_log.insert(pair<utime_t,LogEntry>(p->stamp, *p));
}
}
- pending_summary.prune(g_conf->mon_log_max_summary);
+ pending_summary.prune(g_conf()->mon_log_max_summary);
wait_for_finished_proposal(op, new C_Log(this, op));
return true;
}
bool LogMonitor::should_propose(double& delay)
{
// commit now if we have a lot of pending events
- if (g_conf->mon_max_log_entries_per_event > 0 &&
- pending_log.size() >= (unsigned)g_conf->mon_max_log_entries_per_event)
+ if (g_conf()->mon_max_log_entries_per_event > 0 &&
+ pending_log.size() >= (unsigned)g_conf()->mon_max_log_entries_per_event)
return true;
// otherwise fall back to generic policy
bufferlist rdata;
stringstream ss;
- map<string, cmd_vartype> cmdmap;
+ cmdmap_t cmdmap;
if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
string rs = ss.str();
mon->reply_command(op, -EINVAL, rs, get_last_committed());
return true;
}
- MonSession *session = m->get_session();
+ MonSession *session = op->get_session();
if (!session) {
mon->reply_command(op, -EACCES, "access denied", get_last_committed());
return true;
}
string prefix;
- cmd_getval_throws(g_ceph_context, cmdmap, "prefix", prefix);
+ cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
string format;
- cmd_getval_throws(g_ceph_context, cmdmap, "format", format, string("plain"));
+ cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
boost::scoped_ptr<Formatter> f(Formatter::create(format));
if (prefix == "log last") {
int64_t num = 20;
- cmd_getval_throws(g_ceph_context, cmdmap, "num", num);
+ cmd_getval(g_ceph_context, cmdmap, "num", num);
if (f) {
f->open_array_section("tail");
}
std::string level_str;
clog_type level;
- if (cmd_getval_throws(g_ceph_context, cmdmap, "level", level_str)) {
+ if (cmd_getval(g_ceph_context, cmdmap, "level", level_str)) {
level = LogEntry::str_to_level(level_str);
if (level == CLOG_UNKNOWN) {
ss << "Invalid severity '" << level_str << "'";
}
std::string channel;
- if (!cmd_getval_throws(g_ceph_context, cmdmap, "channel", channel)) {
+ if (!cmd_getval(g_ceph_context, cmdmap, "channel", channel)) {
channel = CLOG_CHANNEL_DEFAULT;
}
// We'll apply this twice, once while counting out lines
// and once while outputting them.
- auto match = [level, channel](const LogEntry &entry) {
- return entry.prio >= level && (entry.channel == channel || channel == "*");
+ auto match = [level](const LogEntry &entry) {
+ return entry.prio >= level;
};
- auto rp = summary.tail.rbegin();
- for (; num > 0 && rp != summary.tail.rend(); ++rp) {
- if (match(*rp)) {
- num--;
- }
- }
- if (rp == summary.tail.rend()) {
- --rp;
- }
+ // Decrement operation that sets to container end when hitting rbegin
ostringstream ss;
- for (; rp != summary.tail.rbegin(); --rp) {
- if (!match(*rp)) {
- continue;
+ if (channel == "*") {
+ list<LogEntry> full_tail;
+ summary.build_ordered_tail(&full_tail);
+ derr << "full " << full_tail << dendl;
+ auto rp = full_tail.rbegin();
+ for (; num > 0 && rp != full_tail.rend(); ++rp) {
+ if (match(*rp)) {
+ num--;
+ }
+ }
+ if (rp == full_tail.rend()) {
+ --rp;
}
- if (f) {
- f->dump_object("entry", *rp);
- } else {
- ss << *rp << "\n";
+ // Decrement a reverse iterator such that going past rbegin()
+ // sets it to rend(). This is for writing a for() loop that
+ // goes up to (and including) rbegin()
+ auto dec = [&rp, &full_tail] () {
+ if (rp == full_tail.rbegin()) {
+ rp = full_tail.rend();
+ } else {
+ --rp;
+ }
+ };
+
+ // Move forward to the end of the container (decrement the reverse
+ // iterator).
+ for (; rp != full_tail.rend(); dec()) {
+ if (!match(*rp)) {
+ continue;
+ }
+ if (f) {
+ f->dump_object("entry", *rp);
+ } else {
+ ss << *rp << "\n";
+ }
+ }
+ } else {
+ auto p = summary.tail_by_channel.find(channel);
+ if (p != summary.tail_by_channel.end()) {
+ auto rp = p->second.rbegin();
+ for (; num > 0 && rp != p->second.rend(); ++rp) {
+ if (match(rp->second)) {
+ num--;
+ }
+ }
+ if (rp == p->second.rend()) {
+ --rp;
+ }
+
+ // Decrement a reverse iterator such that going past rbegin()
+ // sets it to rend(). This is for writing a for() loop that
+ // goes up to (and including) rbegin()
+ auto dec = [&rp, &p] () {
+ if (rp == p->second.rbegin()) {
+ rp = p->second.rend();
+ } else {
+ --rp;
+ }
+ };
+
+ // Move forward to the end of the container (decrement the reverse
+ // iterator).
+ for (; rp != p->second.rend(); dec()) {
+ if (!match(rp->second)) {
+ continue;
+ }
+ if (f) {
+ f->dump_object("entry", rp->second);
+ } else {
+ ss << rp->second << "\n";
+ }
+ }
}
}
if (f) {
string rs;
int err = -EINVAL;
- map<string, cmd_vartype> cmdmap;
+ cmdmap_t cmdmap;
if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
// ss has reason for failure
string rs = ss.str();
}
string prefix;
- cmd_getval_throws(g_ceph_context, cmdmap, "prefix", prefix);
+ cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
- MonSession *session = m->get_session();
+ MonSession *session = op->get_session();
if (!session) {
mon->reply_command(op, -EACCES, "access denied", get_last_committed());
return true;
if (prefix == "log") {
vector<string> logtext;
- cmd_getval_throws(g_ceph_context, cmdmap, "logtext", logtext);
+ cmd_getval(g_ceph_context, cmdmap, "logtext", logtext);
LogEntry le;
- le.who = m->get_orig_source_inst();
+ le.rank = m->get_orig_source();
+ le.addrs.v.push_back(m->get_orig_source_addr());
le.name = session->entity_name;
le.stamp = m->get_recv_stamp();
le.seq = 0;
le.channel = CLOG_CHANNEL_DEFAULT;
le.msg = str_join(logtext, " ");
pending_summary.add(le);
- pending_summary.prune(g_conf->mon_log_max_summary);
+ pending_summary.prune(g_conf()->mon_log_max_summary);
pending_log.insert(pair<utime_t,LogEntry>(le.stamp, le));
wait_for_finished_proposal(op, new Monitor::C_Command(
mon, op, 0, string(), get_last_committed() + 1));
dout(10) << __func__ << " client wants " << s->type << " ver " << s->next << dendl;
int sub_level = sub_name_to_id(s->type);
- assert(sub_level >= 0);
+ ceph_assert(sub_level >= 0);
version_t summary_version = summary.version;
if (s->next > summary_version) {
- dout(10) << __func__ << " client " << s->session->inst
+ dout(10) << __func__ << " client " << s->session->name
<< " requested version (" << s->next << ") is greater than ours ("
<< summary_version << "), which means we already sent him"
<< " everything we have." << dendl;
if (s->next == 0) {
/* First timer, heh? */
- bool ret = _create_sub_summary(mlog, sub_level);
- if (!ret) {
- dout(1) << __func__ << " ret = " << ret << dendl;
- mlog->put();
- return;
- }
+ _create_sub_incremental(mlog, sub_level, get_last_committed());
} else {
/* let us send you an incremental log... */
_create_sub_incremental(mlog, sub_level, s->next);
}
- dout(1) << __func__ << " sending message to " << s->session->inst
+ dout(10) << __func__ << " sending message to " << s->session->name
<< " with " << mlog->entries.size() << " entries"
<< " (version " << mlog->version << ")" << dendl;
s->next = summary_version+1;
}
-/**
- * Create a log message containing only the last message in the summary.
- *
- * @param mlog Log message we'll send to the client.
- * @param level Maximum log level the client is interested in.
- * @return 'true' if we consider we successfully populated @mlog;
- * 'false' otherwise.
- */
-bool LogMonitor::_create_sub_summary(MLog *mlog, int level)
-{
- dout(10) << __func__ << dendl;
-
- assert(mlog != NULL);
-
- if (!summary.tail.size())
- return false;
-
- list<LogEntry>::reverse_iterator it = summary.tail.rbegin();
- for (; it != summary.tail.rend(); ++it) {
- LogEntry e = *it;
- if (e.prio < level)
- continue;
-
- mlog->entries.push_back(e);
- mlog->version = summary.version;
- break;
- }
-
- return true;
-}
-
/**
* Create an incremental log message from version \p sv to \p summary.version
*
while (sv && sv <= summary_ver) {
bufferlist bl;
int err = get_version(sv, bl);
- assert(err == 0);
- assert(bl.length());
- bufferlist::iterator p = bl.begin();
+ ceph_assert(err == 0);
+ ceph_assert(bl.length());
+ auto p = bl.cbegin();
__u8 v;
- ::decode(v,p);
+ decode(v,p);
while (!p.end()) {
LogEntry le;
le.decode(p);
channels.clear();
- int r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog,
+ int r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_syslog,
oss, &channels.log_to_syslog,
CLOG_CONFIG_DEFAULT_KEY);
if (r < 0) {
return;
}
- r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_level,
+ r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_syslog_level,
oss, &channels.syslog_level,
CLOG_CONFIG_DEFAULT_KEY);
if (r < 0) {
return;
}
- r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_facility,
+ r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_syslog_facility,
oss, &channels.syslog_facility,
CLOG_CONFIG_DEFAULT_KEY);
if (r < 0) {
return;
}
- r = get_conf_str_map_helper(g_conf->mon_cluster_log_file, oss,
+ r = get_conf_str_map_helper(g_conf()->mon_cluster_log_file, oss,
&channels.log_file,
CLOG_CONFIG_DEFAULT_KEY);
if (r < 0) {
return;
}
- r = get_conf_str_map_helper(g_conf->mon_cluster_log_file_level, oss,
+ r = get_conf_str_map_helper(g_conf()->mon_cluster_log_file_level, oss,
&channels.log_file_level,
CLOG_CONFIG_DEFAULT_KEY);
if (r < 0) {
return;
}
- r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog, oss,
+ r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_graylog, oss,
&channels.log_to_graylog,
CLOG_CONFIG_DEFAULT_KEY);
if (r < 0) {
return;
}
- r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_host, oss,
+ r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_graylog_host, oss,
&channels.log_to_graylog_host,
CLOG_CONFIG_DEFAULT_KEY);
if (r < 0) {
return;
}
- r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_port, oss,
+ r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_graylog_port, oss,
&channels.log_to_graylog_port,
CLOG_CONFIG_DEFAULT_KEY);
if (r < 0) {
if (graylogs.count(channel) == 0) {
auto graylog(std::make_shared<ceph::logging::Graylog>("mon"));
- graylog->set_fsid(g_conf->get_val<uuid_d>("fsid"));
- graylog->set_hostname(g_conf->host);
+ graylog->set_fsid(g_conf().get_val<uuid_d>("fsid"));
+ graylog->set_hostname(g_conf()->host);
graylog->set_destination(get_str_map_key(log_to_graylog_host, channel,
&CLOG_CONFIG_DEFAULT_KEY),
atoi(get_str_map_key(log_to_graylog_port, channel,
return graylogs[channel];
}
-void LogMonitor::handle_conf_change(const struct md_config_t *conf,
+void LogMonitor::handle_conf_change(const ConfigProxy& conf,
const std::set<std::string> &changed)
{
if (changed.count("mon_cluster_log_to_syslog") ||