]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mon/LogMonitor.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / mon / LogMonitor.cc
index fc983bceecdb476a1100aaed4af185ec683303c5..2d2ddad54dd96e34ea58bb85a8db64d3273e32dc 100644 (file)
@@ -27,7 +27,7 @@
 #include "common/Graylog.h"
 #include "common/errno.h"
 #include "common/strtol.h"
-#include "include/assert.h"
+#include "include/ceph_assert.h"
 #include "include/str_list.h"
 #include "include/str_map.h"
 #include "include/compat.h"
@@ -62,7 +62,9 @@ void LogMonitor::create_initial()
 {
   dout(10) << "create_initial -- creating initial map" << dendl;
   LogEntry e;
-  e.name = g_conf->name;
+  e.name = g_conf()->name;
+  e.rank = entity_name_t::MON(mon->rank);
+  e.addrs = mon->messenger->get_myaddrs();
   e.stamp = ceph_clock_now();
   e.prio = CLOG_INFO;
   std::stringstream ss;
@@ -80,7 +82,7 @@ void LogMonitor::update_from_paxos(bool *need_bootstrap)
            << " summary v " << summary.version << dendl;
   if (version == summary.version)
     return;
-  assert(version >= summary.version);
+  ceph_assert(version >= summary.version);
 
   map<string,bufferlist> channel_blog;
 
@@ -89,10 +91,10 @@ void LogMonitor::update_from_paxos(bool *need_bootstrap)
   if ((latest_full > 0) && (latest_full > summary.version)) {
     bufferlist latest_bl;
     get_version_full(latest_full, latest_bl);
-    assert(latest_bl.length() != 0);
+    ceph_assert(latest_bl.length() != 0);
     dout(7) << __func__ << " loading summary e" << latest_full << dendl;
-    bufferlist::iterator p = latest_bl.begin();
-    ::decode(summary, p);
+    auto p = latest_bl.cbegin();
+    decode(summary, p);
     dout(7) << __func__ << " loaded summary e" << summary.version << dendl;
   }
 
@@ -100,12 +102,12 @@ void LogMonitor::update_from_paxos(bool *need_bootstrap)
   while (version > summary.version) {
     bufferlist bl;
     int err = get_version(summary.version+1, bl);
-    assert(err == 0);
-    assert(bl.length());
+    ceph_assert(err == 0);
+    ceph_assert(bl.length());
 
-    bufferlist::iterator p = bl.begin();
+    auto p = bl.cbegin();
     __u8 v;
-    ::decode(v, p);
+    decode(v, p);
     while (!p.end()) {
       LogEntry le;
       le.decode(p);
@@ -115,7 +117,7 @@ void LogMonitor::update_from_paxos(bool *need_bootstrap)
       if (channel.empty()) // keep retrocompatibility
         channel = CLOG_CHANNEL_CLUSTER;
 
-      if (g_conf->get_val<bool>("mon_cluster_log_to_stderr")) {
+      if (g_conf().get_val<bool>("mon_cluster_log_to_stderr")) {
        cerr << channel << " " << le << std::endl;
       }
 
@@ -141,26 +143,28 @@ void LogMonitor::update_from_paxos(bool *need_bootstrap)
                << " host:" << channels.log_to_graylog_host << dendl;
       }
 
-      string log_file = channels.get_log_file(channel);
-      dout(20) << __func__ << " logging for channel '" << channel
-               << "' to file '" << log_file << "'" << dendl;
-
-      if (!log_file.empty()) {
-        string log_file_level = channels.get_log_file_level(channel);
-        if (log_file_level.empty()) {
-          dout(1) << __func__ << " warning: log file level not defined for"
-                  << " channel '" << channel << "' yet a log file is --"
-                  << " will assume lowest level possible" << dendl;
-        }
-
-       int min = string_to_syslog_level(log_file_level);
-       int l = clog_type_to_syslog_level(le.prio);
-       if (l <= min) {
-         stringstream ss;
-         ss << le << "\n";
-          // init entry if DNE
-          bufferlist &blog = channel_blog[channel];
-          blog.append(ss.str());
+      if (g_conf()->mon_cluster_log_to_file) {
+       string log_file = channels.get_log_file(channel);
+       dout(20) << __func__ << " logging for channel '" << channel
+                << "' to file '" << log_file << "'" << dendl;
+
+       if (!log_file.empty()) {
+         string log_file_level = channels.get_log_file_level(channel);
+         if (log_file_level.empty()) {
+           dout(1) << __func__ << " warning: log file level not defined for"
+                   << " channel '" << channel << "' yet a log file is --"
+                   << " will assume lowest level possible" << dendl;
+         }
+
+         int min = string_to_syslog_level(log_file_level);
+         int l = clog_type_to_syslog_level(le.prio);
+         if (l <= min) {
+           stringstream ss;
+           ss << le << "\n";
+           // init entry if DNE
+           bufferlist &blog = channel_blog[channel];
+           blog.append(ss.str());
+         }
        }
       }
 
@@ -168,7 +172,7 @@ void LogMonitor::update_from_paxos(bool *need_bootstrap)
     }
 
     summary.version++;
-    summary.prune(g_conf->mon_log_max_summary);
+    summary.prune(g_conf()->mon_log_max_summary);
   }
 
   dout(15) << __func__ << " logging for "
@@ -216,7 +220,7 @@ void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t)
   bufferlist bl;
   dout(10) << __func__ << " v" << version << dendl;
   __u8 v = 1;
-  ::encode(v, bl);
+  encode(v, bl);
   multimap<utime_t,LogEntry>::iterator p;
   for (p = pending_log.begin(); p != pending_log.end(); ++p)
     p->second.encode(bl, mon->get_quorum_con_features());
@@ -228,21 +232,21 @@ void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t)
 void LogMonitor::encode_full(MonitorDBStore::TransactionRef t)
 {
   dout(10) << __func__ << " log v " << summary.version << dendl;
-  assert(get_last_committed() == summary.version);
+  ceph_assert(get_last_committed() == summary.version);
 
   bufferlist summary_bl;
-  ::encode(summary, summary_bl, mon->get_quorum_con_features());
+  encode(summary, summary_bl, mon->get_quorum_con_features());
 
   put_version_full(t, summary.version, summary_bl);
   put_version_latest_full(t, summary.version);
 }
 
-version_t LogMonitor::get_trim_to()
+version_t LogMonitor::get_trim_to() const
 {
   if (!mon->is_leader())
     return 0;
 
-  unsigned max = g_conf->mon_max_log_epochs;
+  unsigned max = g_conf()->mon_max_log_epochs;
   version_t version = get_last_committed();
   if (version > max)
     return version - max;
@@ -258,8 +262,7 @@ bool LogMonitor::preprocess_query(MonOpRequestRef op)
   case MSG_MON_COMMAND:
     try {
       return preprocess_command(op);
-    }
-    catch (const bad_cmd_get& e) {
+    } catch (const bad_cmd_get& e) {
       bufferlist bl;
       mon->reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
       return true;
@@ -283,8 +286,7 @@ bool LogMonitor::prepare_update(MonOpRequestRef op)
   case MSG_MON_COMMAND:
     try {
       return prepare_command(op);
-    }
-    catch (const bad_cmd_get& e) {
+    } catch (const bad_cmd_get& e) {
       bufferlist bl;
       mon->reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
       return true;
@@ -304,7 +306,7 @@ bool LogMonitor::preprocess_log(MonOpRequestRef op)
   dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl;
   int num_new = 0;
 
-  MonSession *session = m->get_session();
+  MonSession *session = op->get_session();
   if (!session)
     goto done;
   if (!session->is_capable("log", MON_CAP_W)) {
@@ -364,7 +366,7 @@ bool LogMonitor::prepare_log(MonOpRequestRef op)
       pending_log.insert(pair<utime_t,LogEntry>(p->stamp, *p));
     }
   }
-  pending_summary.prune(g_conf->mon_log_max_summary);
+  pending_summary.prune(g_conf()->mon_log_max_summary);
   wait_for_finished_proposal(op, new C_Log(this, op));
   return true;
 }
@@ -379,8 +381,8 @@ void LogMonitor::_updated_log(MonOpRequestRef op)
 bool LogMonitor::should_propose(double& delay)
 {
   // commit now if we have a lot of pending events
-  if (g_conf->mon_max_log_entries_per_event > 0 &&
-      pending_log.size() >= (unsigned)g_conf->mon_max_log_entries_per_event)
+  if (g_conf()->mon_max_log_entries_per_event > 0 &&
+      pending_log.size() >= (unsigned)g_conf()->mon_max_log_entries_per_event)
     return true;
 
   // otherwise fall back to generic policy
@@ -396,35 +398,35 @@ bool LogMonitor::preprocess_command(MonOpRequestRef op)
   bufferlist rdata;
   stringstream ss;
 
-  map<string, cmd_vartype> cmdmap;
+  cmdmap_t cmdmap;
   if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
     string rs = ss.str();
     mon->reply_command(op, -EINVAL, rs, get_last_committed());
     return true;
   }
-  MonSession *session = m->get_session();
+  MonSession *session = op->get_session();
   if (!session) {
     mon->reply_command(op, -EACCES, "access denied", get_last_committed());
     return true;
   }
 
   string prefix;
-  cmd_getval_throws(g_ceph_context, cmdmap, "prefix", prefix);
+  cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
 
   string format;
-  cmd_getval_throws(g_ceph_context, cmdmap, "format", format, string("plain"));
+  cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
   boost::scoped_ptr<Formatter> f(Formatter::create(format));
 
   if (prefix == "log last") {
     int64_t num = 20;
-    cmd_getval_throws(g_ceph_context, cmdmap, "num", num);
+    cmd_getval(g_ceph_context, cmdmap, "num", num);
     if (f) {
       f->open_array_section("tail");
     }
 
     std::string level_str;
     clog_type level;
-    if (cmd_getval_throws(g_ceph_context, cmdmap, "level", level_str)) {
+    if (cmd_getval(g_ceph_context, cmdmap, "level", level_str)) {
       level = LogEntry::str_to_level(level_str);
       if (level == CLOG_UNKNOWN) {
         ss << "Invalid severity '" << level_str << "'";
@@ -436,35 +438,91 @@ bool LogMonitor::preprocess_command(MonOpRequestRef op)
     }
 
     std::string channel;
-    if (!cmd_getval_throws(g_ceph_context, cmdmap, "channel", channel)) {
+    if (!cmd_getval(g_ceph_context, cmdmap, "channel", channel)) {
       channel = CLOG_CHANNEL_DEFAULT;
     }
 
     // We'll apply this twice, once while counting out lines
     // and once while outputting them.
-    auto match = [level, channel](const LogEntry &entry) {
-      return entry.prio >= level && (entry.channel == channel || channel == "*");
+    auto match = [level](const LogEntry &entry) {
+      return entry.prio >= level;
     };
 
-    auto rp = summary.tail.rbegin();
-    for (; num > 0 && rp != summary.tail.rend(); ++rp) {
-      if (match(*rp)) {
-        num--;
-      }
-    }
-    if (rp == summary.tail.rend()) {
-      --rp;
-    }
+    // Decrement operation that sets to container end when hitting rbegin
     ostringstream ss;
-    for (; rp != summary.tail.rbegin(); --rp) {
-      if (!match(*rp)) {
-        continue;
+    if (channel == "*") {
+      list<LogEntry> full_tail;
+      summary.build_ordered_tail(&full_tail);
+      derr << "full " << full_tail << dendl;
+      auto rp = full_tail.rbegin();
+      for (; num > 0 && rp != full_tail.rend(); ++rp) {
+       if (match(*rp)) {
+         num--;
+       }
+      }
+      if (rp == full_tail.rend()) {
+       --rp;
       }
 
-      if (f) {
-       f->dump_object("entry", *rp);
-      } else {
-       ss << *rp << "\n";
+      // Decrement a reverse iterator such that going past rbegin()
+      // sets it to rend().  This is for writing a for() loop that
+      // goes up to (and including) rbegin()
+      auto dec = [&rp, &full_tail] () {
+        if (rp == full_tail.rbegin()) {
+          rp = full_tail.rend();
+        } else {
+          --rp;
+        }
+      };
+
+      // Move forward to the end of the container (decrement the reverse
+      // iterator).
+      for (; rp != full_tail.rend(); dec()) {
+       if (!match(*rp)) {
+         continue;
+       }
+       if (f) {
+         f->dump_object("entry", *rp);
+       } else {
+         ss << *rp << "\n";
+       }
+      }
+    } else {
+      auto p = summary.tail_by_channel.find(channel);
+      if (p != summary.tail_by_channel.end()) {
+       auto rp = p->second.rbegin();
+       for (; num > 0 && rp != p->second.rend(); ++rp) {
+         if (match(rp->second)) {
+           num--;
+         }
+       }
+       if (rp == p->second.rend()) {
+         --rp;
+       }
+
+        // Decrement a reverse iterator such that going past rbegin()
+        // sets it to rend().  This is for writing a for() loop that
+        // goes up to (and including) rbegin()
+        auto dec = [&rp, &p] () {
+          if (rp == p->second.rbegin()) {
+            rp = p->second.rend();
+          } else {
+            --rp;
+          }
+        };
+
+        // Move forward to the end of the container (decrement the reverse
+        // iterator).
+       for (; rp != p->second.rend(); dec()) {
+         if (!match(rp->second)) {
+           continue;
+         }
+         if (f) {
+           f->dump_object("entry", rp->second);
+         } else {
+           ss << rp->second << "\n";
+         }
+       }
       }
     }
     if (f) {
@@ -493,7 +551,7 @@ bool LogMonitor::prepare_command(MonOpRequestRef op)
   string rs;
   int err = -EINVAL;
 
-  map<string, cmd_vartype> cmdmap;
+  cmdmap_t cmdmap;
   if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
     // ss has reason for failure
     string rs = ss.str();
@@ -502,9 +560,9 @@ bool LogMonitor::prepare_command(MonOpRequestRef op)
   }
 
   string prefix;
-  cmd_getval_throws(g_ceph_context, cmdmap, "prefix", prefix);
+  cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
 
-  MonSession *session = m->get_session();
+  MonSession *session = op->get_session();
   if (!session) {
     mon->reply_command(op, -EACCES, "access denied", get_last_committed());
     return true;
@@ -512,9 +570,10 @@ bool LogMonitor::prepare_command(MonOpRequestRef op)
 
   if (prefix == "log") {
     vector<string> logtext;
-    cmd_getval_throws(g_ceph_context, cmdmap, "logtext", logtext);
+    cmd_getval(g_ceph_context, cmdmap, "logtext", logtext);
     LogEntry le;
-    le.who = m->get_orig_source_inst();
+    le.rank = m->get_orig_source();
+    le.addrs.v.push_back(m->get_orig_source_addr());
     le.name = session->entity_name;
     le.stamp = m->get_recv_stamp();
     le.seq = 0;
@@ -522,7 +581,7 @@ bool LogMonitor::prepare_command(MonOpRequestRef op)
     le.channel = CLOG_CHANNEL_DEFAULT;
     le.msg = str_join(logtext, " ");
     pending_summary.add(le);
-    pending_summary.prune(g_conf->mon_log_max_summary);
+    pending_summary.prune(g_conf()->mon_log_max_summary);
     pending_log.insert(pair<utime_t,LogEntry>(le.stamp, le));
     wait_for_finished_proposal(op, new Monitor::C_Command(
           mon, op, 0, string(), get_last_committed() + 1));
@@ -562,11 +621,11 @@ void LogMonitor::check_sub(Subscription *s)
   dout(10) << __func__ << " client wants " << s->type << " ver " << s->next << dendl;
 
   int sub_level = sub_name_to_id(s->type);
-  assert(sub_level >= 0);
+  ceph_assert(sub_level >= 0);
 
   version_t summary_version = summary.version;
   if (s->next > summary_version) {
-    dout(10) << __func__ << " client " << s->session->inst 
+    dout(10) << __func__ << " client " << s->session->name
            << " requested version (" << s->next << ") is greater than ours (" 
            << summary_version << "), which means we already sent him" 
            << " everything we have." << dendl;
@@ -577,18 +636,13 @@ void LogMonitor::check_sub(Subscription *s)
 
   if (s->next == 0) { 
     /* First timer, heh? */
-    bool ret = _create_sub_summary(mlog, sub_level);
-    if (!ret) {
-      dout(1) << __func__ << " ret = " << ret << dendl;
-      mlog->put();
-      return;
-    }
+    _create_sub_incremental(mlog, sub_level, get_last_committed());
   } else {
     /* let us send you an incremental log... */
     _create_sub_incremental(mlog, sub_level, s->next);
   }
 
-  dout(1) << __func__ << " sending message to " << s->session->inst 
+  dout(10) << __func__ << " sending message to " << s->session->name
          << " with " << mlog->entries.size() << " entries"
          << " (version " << mlog->version << ")" << dendl;
   
@@ -603,37 +657,6 @@ void LogMonitor::check_sub(Subscription *s)
     s->next = summary_version+1;
 }
 
-/**
- * Create a log message containing only the last message in the summary.
- *
- * @param mlog Log message we'll send to the client.
- * @param level Maximum log level the client is interested in.
- * @return     'true' if we consider we successfully populated @mlog;
- *             'false' otherwise.
- */
-bool LogMonitor::_create_sub_summary(MLog *mlog, int level)
-{
-  dout(10) << __func__ << dendl;
-
-  assert(mlog != NULL);
-
-  if (!summary.tail.size())
-    return false;
-
-  list<LogEntry>::reverse_iterator it = summary.tail.rbegin();
-  for (; it != summary.tail.rend(); ++it) {
-    LogEntry e = *it;
-    if (e.prio < level)
-      continue;
-
-    mlog->entries.push_back(e);
-    mlog->version = summary.version;
-    break;
-  }
-
-  return true;
-}
-
 /**
  * Create an incremental log message from version \p sv to \p summary.version
  *
@@ -664,11 +687,11 @@ void LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv)
   while (sv && sv <= summary_ver) {
     bufferlist bl;
     int err = get_version(sv, bl);
-    assert(err == 0);
-    assert(bl.length());
-    bufferlist::iterator p = bl.begin();
+    ceph_assert(err == 0);
+    ceph_assert(bl.length());
+    auto p = bl.cbegin();
     __u8 v;
-    ::decode(v,p);
+    decode(v,p);
     while (!p.end()) {
       LogEntry le;
       le.decode(p);
@@ -694,7 +717,7 @@ void LogMonitor::update_log_channels()
 
   channels.clear();
 
-  int r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog,
+  int r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_syslog,
                                   oss, &channels.log_to_syslog,
                                   CLOG_CONFIG_DEFAULT_KEY);
   if (r < 0) {
@@ -702,7 +725,7 @@ void LogMonitor::update_log_channels()
     return;
   }
 
-  r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_level,
+  r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_syslog_level,
                               oss, &channels.syslog_level,
                               CLOG_CONFIG_DEFAULT_KEY);
   if (r < 0) {
@@ -711,7 +734,7 @@ void LogMonitor::update_log_channels()
     return;
   }
 
-  r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_facility,
+  r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_syslog_facility,
                               oss, &channels.syslog_facility,
                               CLOG_CONFIG_DEFAULT_KEY);
   if (r < 0) {
@@ -720,7 +743,7 @@ void LogMonitor::update_log_channels()
     return;
   }
 
-  r = get_conf_str_map_helper(g_conf->mon_cluster_log_file, oss,
+  r = get_conf_str_map_helper(g_conf()->mon_cluster_log_file, oss,
                               &channels.log_file,
                               CLOG_CONFIG_DEFAULT_KEY);
   if (r < 0) {
@@ -728,7 +751,7 @@ void LogMonitor::update_log_channels()
     return;
   }
 
-  r = get_conf_str_map_helper(g_conf->mon_cluster_log_file_level, oss,
+  r = get_conf_str_map_helper(g_conf()->mon_cluster_log_file_level, oss,
                               &channels.log_file_level,
                               CLOG_CONFIG_DEFAULT_KEY);
   if (r < 0) {
@@ -737,7 +760,7 @@ void LogMonitor::update_log_channels()
     return;
   }
 
-  r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog, oss,
+  r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_graylog, oss,
                               &channels.log_to_graylog,
                               CLOG_CONFIG_DEFAULT_KEY);
   if (r < 0) {
@@ -746,7 +769,7 @@ void LogMonitor::update_log_channels()
     return;
   }
 
-  r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_host, oss,
+  r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_graylog_host, oss,
                               &channels.log_to_graylog_host,
                               CLOG_CONFIG_DEFAULT_KEY);
   if (r < 0) {
@@ -755,7 +778,7 @@ void LogMonitor::update_log_channels()
     return;
   }
 
-  r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_port, oss,
+  r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_graylog_port, oss,
                               &channels.log_to_graylog_port,
                               CLOG_CONFIG_DEFAULT_KEY);
   if (r < 0) {
@@ -830,8 +853,8 @@ ceph::logging::Graylog::Ref LogMonitor::log_channel_info::get_graylog(
   if (graylogs.count(channel) == 0) {
     auto graylog(std::make_shared<ceph::logging::Graylog>("mon"));
 
-    graylog->set_fsid(g_conf->get_val<uuid_d>("fsid"));
-    graylog->set_hostname(g_conf->host);
+    graylog->set_fsid(g_conf().get_val<uuid_d>("fsid"));
+    graylog->set_hostname(g_conf()->host);
     graylog->set_destination(get_str_map_key(log_to_graylog_host, channel,
                                             &CLOG_CONFIG_DEFAULT_KEY),
                             atoi(get_str_map_key(log_to_graylog_port, channel,
@@ -847,7 +870,7 @@ ceph::logging::Graylog::Ref LogMonitor::log_channel_info::get_graylog(
   return graylogs[channel];
 }
 
-void LogMonitor::handle_conf_change(const struct md_config_t *conf,
+void LogMonitor::handle_conf_change(const ConfigProxy& conf,
                                     const std::set<std::string> &changed)
 {
   if (changed.count("mon_cluster_log_to_syslog") ||