]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/Beacon.cc
update sources to 12.2.10
[ceph.git] / ceph / src / mds / Beacon.cc
index 5750adc375927d23a4b6708691c56c0f4219554b..475169a1604e05311d6942194dd87e473f646555 100644 (file)
 
 #include "Beacon.h"
 
+#include <chrono>
+
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mds
 #undef dout_prefix
 #define dout_prefix *_dout << "mds.beacon." << name << ' '
 
-
-Beacon::Beacon(CephContext *cct_, MonClient *monc_, boost::string_view name_) :
-  Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock),
-  name(name_), standby_for_rank(MDS_RANK_NONE),
-  standby_for_fscid(FS_CLUSTER_ID_NONE), want_state(MDSMap::STATE_BOOT),
-  awaiting_seq(-1)
+Beacon::Beacon(CephContext *cct, MonClient *monc, boost::string_view name)
+  :
+    Dispatcher(cct),
+    beacon_interval(g_conf->mds_beacon_interval),
+    monc(monc),
+    name(name)
 {
-  last_seq = 0;
-  was_laggy = false;
-
-  epoch = 0;
 }
 
-
 Beacon::~Beacon()
 {
+  shutdown();
 }
 
+void Beacon::shutdown()
+{
+  std::unique_lock<std::mutex> lock(mutex);
+  if (!finished) {
+    finished = true;
+    lock.unlock();
+    sender.join();
+  }
+}
 
-void Beacon::init(MDSMap const *mdsmap)
+void Beacon::init(const MDSMap* mdsmap)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
   assert(mdsmap != NULL);
 
   _notify_mdsmap(mdsmap);
@@ -63,28 +70,42 @@ void Beacon::init(MDSMap const *mdsmap)
   standby_for_fscid = fs_cluster_id_t(g_conf->mds_standby_for_fscid);
   standby_replay = g_conf->mds_standby_replay;
 
-  // Spawn threads and start messaging
-  timer.init();
-  _send();
+  sender = std::thread([this]() {
+    std::unique_lock<std::mutex> lock(mutex);
+    std::condition_variable c; // no one wakes us
+    while (!finished) {
+      auto now = clock::now();
+      auto since = std::chrono::duration<double>(now-last_send).count();
+      auto interval = beacon_interval;
+      if (since >= interval*.90) {
+        _send();
+      } else {
+        interval -= since;
+      }
+      dout(20) << "sender thread waiting interval " << interval << "s" << dendl;
+      c.wait_for(lock, interval*std::chrono::seconds(1));
+    }
+  });
 }
 
-
-void Beacon::shutdown()
+bool Beacon::ms_can_fast_dispatch(const Message *m) const
 {
-  Mutex::Locker l(lock);
-  if (sender) {
-    timer.cancel_event(sender);
-    sender = NULL;
-  }
-  timer.shutdown();
+  return m->get_type() == MSG_MDS_BEACON;
 }
 
+void Beacon::ms_fast_dispatch(Message *m)
+{
+  bool handled = ms_dispatch(m);
+  assert(handled);
+}
 
 bool Beacon::ms_dispatch(Message *m)
 {
   if (m->get_type() == MSG_MDS_BEACON) {
     if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
       handle_mds_beacon(static_cast<MMDSBeacon*>(m));
+    } else {
+      m->put();
     }
     return true;
   }
@@ -100,47 +121,35 @@ bool Beacon::ms_dispatch(Message *m)
  */
 void Beacon::handle_mds_beacon(MMDSBeacon *m)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
   assert(m != NULL);
 
   version_t seq = m->get_seq();
 
   // update lab
-  if (seq_stamp.count(seq)) {
-    utime_t now = ceph_clock_now();
-    if (seq_stamp[seq] > last_acked_stamp) {
-      last_acked_stamp = seq_stamp[seq];
-      utime_t rtt = now - last_acked_stamp;
-
-      dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
-              << " seq " << m->get_seq() << " rtt " << rtt << dendl;
-
-      if (was_laggy && rtt < g_conf->mds_beacon_grace) {
-       dout(0) << "handle_mds_beacon no longer laggy" << dendl;
-       was_laggy = false;
-       laggy_until = now;
-      }
-    } else {
-      // Mark myself laggy if system clock goes backwards. Hopping
-      // later beacons will clear it.
-      dout(1) << "handle_mds_beacon system clock goes backwards, "
-             << "mark myself laggy" << dendl;
-      last_acked_stamp = now - utime_t(g_conf->mds_beacon_grace + 1, 0);
-      was_laggy = true;
+  auto it = seq_stamp.find(seq);
+  if (it != seq_stamp.end()) {
+    auto now = clock::now();
+
+    last_acked_stamp = it->second;
+    auto rtt = std::chrono::duration<double>(now - last_acked_stamp).count();
+
+    dout(5) << "received beacon reply " << ceph_mds_state_name(m->get_state()) << " seq " << m->get_seq() << " rtt " << rtt << dendl;
+
+    if (laggy && rtt < g_conf->mds_beacon_grace) {
+      dout(0) << " MDS is no longer laggy" << dendl;
+      laggy = false;
+      last_laggy = now;
     }
 
     // clean up seq_stamp map
-    while (!seq_stamp.empty() &&
-          seq_stamp.begin()->first <= seq)
-      seq_stamp.erase(seq_stamp.begin());
+    seq_stamp.erase(seq_stamp.begin(), ++it);
 
     // Wake a waiter up if present
-    if (awaiting_seq == seq) {
-      waiting_cond.Signal();
-    }
+    cvar.notify_all();
   } else {
-    dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
-            << " seq " << m->get_seq() << " dne" << dendl;
+    dout(1) << "discarding unexpected beacon reply " << ceph_mds_state_name(m->get_state())
+           << " seq " << m->get_seq() << " dne" << dendl;
   }
   m->put();
 }
@@ -148,27 +157,26 @@ void Beacon::handle_mds_beacon(MMDSBeacon *m)
 
 void Beacon::send()
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
   _send();
 }
 
 
 void Beacon::send_and_wait(const double duration)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
   _send();
-  awaiting_seq = last_seq;
+  auto awaiting_seq = last_seq;
   dout(20) << __func__ << ": awaiting " << awaiting_seq
            << " for up to " << duration << "s" << dendl;
 
-  utime_t timeout;
-  timeout.set_from_double(ceph_clock_now() + duration);
-  while ((!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq)
-         && ceph_clock_now() < timeout) {
-    waiting_cond.WaitUntil(lock, timeout);
+  auto start = clock::now();
+  while (!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq) {
+    auto now = clock::now();
+    auto s = duration*.95-std::chrono::duration<double>(now-start).count();
+    if (s < 0) break;
+    cvar.wait_for(lock, s*std::chrono::seconds(1));
   }
-
-  awaiting_seq = -1;
 }
 
 
@@ -177,30 +185,20 @@ void Beacon::send_and_wait(const double duration)
  */
 void Beacon::_send()
 {
-  if (sender) {
-    timer.cancel_event(sender);
-  }
-  sender = timer.add_event_after(
-    g_conf->mds_beacon_interval,
-    new FunctionContext([this](int) {
-       assert(lock.is_locked_by_me());
-       sender = nullptr;
-       _send();
-      }));
+  auto now = clock::now();
+  auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
 
   if (!cct->get_heartbeat_map()->is_healthy()) {
     /* If anything isn't progressing, let avoid sending a beacon so that
      * the MDS will consider us laggy */
-    dout(1) << __func__ << " skipping beacon, heartbeat map not healthy" << dendl;
+    dout(0) << "Skipping beacon heartbeat to monitors (last acked " << since << "s ago); MDS internal heartbeat is not healthy!" << dendl;
     return;
   }
 
   ++last_seq;
-  dout(10) << __func__ << " " << ceph_mds_state_name(want_state)
-          << " seq " << last_seq
-          << dendl;
+  dout(5) << "Sending beacon " << ceph_mds_state_name(want_state) << " seq " << last_seq << dendl;
 
-  seq_stamp[last_seq] = ceph_clock_now();
+  seq_stamp[last_seq] = now;
 
   assert(want_state != MDSMap::STATE_NULL);
   
@@ -226,6 +224,7 @@ void Beacon::_send()
     beacon->set_sys_info(sys_info);
   }
   monc->send_mon_message(beacon);
+  last_send = now;
 }
 
 /**
@@ -233,7 +232,7 @@ void Beacon::_send()
  */
 void Beacon::notify_mdsmap(MDSMap const *mdsmap)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
   assert(mdsmap != NULL);
 
   _notify_mdsmap(mdsmap);
@@ -254,21 +253,20 @@ void Beacon::_notify_mdsmap(MDSMap const *mdsmap)
 
 bool Beacon::is_laggy()
 {
-  Mutex::Locker l(lock);
-
-  if (last_acked_stamp == utime_t())
-    return false;
+  std::unique_lock<std::mutex> lock(mutex);
 
-  utime_t now = ceph_clock_now();
-  utime_t since = now - last_acked_stamp;
+  auto now = clock::now();
+  auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
   if (since > g_conf->mds_beacon_grace) {
-    dout(5) << "is_laggy " << since << " > " << g_conf->mds_beacon_grace
-           << " since last acked beacon" << dendl;
-    was_laggy = true;
-    if (since > (g_conf->mds_beacon_grace*2) &&
-       now > last_mon_reconnect + g_conf->mds_beacon_interval) {
+    if (!laggy) {
+      dout(1) << "is_laggy " << since << " > " << g_conf->mds_beacon_grace
+             << " since last acked beacon" << dendl;
+    }
+    laggy = true;
+    auto last_reconnect = std::chrono::duration<double>(now-last_mon_reconnect).count();
+    if (since > (g_conf->mds_beacon_grace*2) && last_reconnect > g_conf->mds_beacon_interval) {
       // maybe it's not us?
-      dout(5) << "initiating monitor reconnect; maybe we're not the slow one"
+      dout(1) << "initiating monitor reconnect; maybe we're not the slow one"
               << dendl;
       last_mon_reconnect = now;
       monc->reopen_session();
@@ -278,16 +276,9 @@ bool Beacon::is_laggy()
   return false;
 }
 
-utime_t Beacon::get_laggy_until() const
-{
-  Mutex::Locker l(lock);
-
-  return laggy_until;
-}
-
-void Beacon::set_want_state(MDSMap const *mdsmap, MDSMap::DaemonState const newstate)
+void Beacon::set_want_state(const MDSMap* mdsmap, MDSMap::DaemonState const newstate)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
 
   // Update mdsmap epoch atomically with updating want_state, so that when
   // we send a beacon with the new want state it has the latest epoch, and
@@ -297,7 +288,7 @@ void Beacon::set_want_state(MDSMap const *mdsmap, MDSMap::DaemonState const news
   _notify_mdsmap(mdsmap);
 
   if (want_state != newstate) {
-    dout(10) << __func__ << ": "
+    dout(5) << __func__ << ": "
       << ceph_mds_state_name(want_state) << " -> "
       << ceph_mds_state_name(newstate) << dendl;
     want_state = newstate;
@@ -312,7 +303,7 @@ void Beacon::set_want_state(MDSMap const *mdsmap, MDSMap::DaemonState const news
  */
 void Beacon::notify_health(MDSRank const *mds)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
   if (!mds) {
     // No MDS rank held
     return;
@@ -349,7 +340,8 @@ void Beacon::notify_health(MDSRank const *mds)
   // CLIENT_CAPS messages.
   {
     std::list<client_t> late_clients;
-    mds->locker->get_late_revoking_clients(&late_clients);
+    mds->locker->get_late_revoking_clients(&late_clients,
+                                           mds->mdsmap->get_session_timeout());
     std::list<MDSHealthMetric> late_cap_metrics;
 
     for (std::list<client_t>::iterator i = late_clients.begin(); i != late_clients.end(); ++i) {
@@ -393,30 +385,34 @@ void Beacon::notify_health(MDSRank const *mds)
     set<Session*> sessions;
     mds->sessionmap.get_client_session_set(sessions);
 
-    utime_t cutoff = ceph_clock_now();
-    cutoff -= g_conf->mds_recall_state_timeout;
-    utime_t last_recall = mds->mdcache->last_recall_state;
+    auto mds_recall_state_timeout = g_conf->mds_recall_state_timeout;
+    auto last_recall = mds->mdcache->last_recall_state;
+    auto last_recall_span = std::chrono::duration<double>(clock::now()-last_recall).count();
+    bool recall_state_timedout = last_recall_span > mds_recall_state_timeout;
 
     std::list<MDSHealthMetric> late_recall_metrics;
     std::list<MDSHealthMetric> large_completed_requests_metrics;
-    for (set<Session*>::iterator i = sessions.begin(); i != sessions.end(); ++i) {
-      Session *session = *i;
-      if (!session->recalled_at.is_zero()) {
+    for (auto& session : sessions) {
+      if (session->recalled_at != Session::time::min()) {
+        auto last_recall_sent = session->last_recall_sent;
+        auto recalled_at = session->recalled_at;
+        auto recalled_at_span = std::chrono::duration<double>(clock::now()-recalled_at).count();
+
         dout(20) << "Session servicing RECALL " << session->info.inst
-          << ": " << session->recalled_at << " " << session->recall_release_count
+          << ": " << recalled_at_span << "s ago " << session->recall_release_count
           << "/" << session->recall_count << dendl;
-       if (last_recall < cutoff || session->last_recall_sent < last_recall) {
+       if (recall_state_timedout || last_recall_sent < last_recall) {
          dout(20) << "  no longer recall" << dendl;
          session->clear_recalled_at();
-       } else if (session->recalled_at < cutoff) {
-          dout(20) << "  exceeded timeout " << session->recalled_at << " vs. " << cutoff << dendl;
+       } else if (recalled_at_span > mds_recall_state_timeout) {
+          dout(20) << "  exceeded timeout " << recalled_at_span << " vs. " << mds_recall_state_timeout << dendl;
           std::ostringstream oss;
          oss << "Client " << session->get_human_name() << " failing to respond to cache pressure";
           MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL, HEALTH_WARN, oss.str());
           m.metadata["client_id"] = stringify(session->info.inst.name.num());
           late_recall_metrics.push_back(m);
         } else {
-          dout(20) << "  within timeout " << session->recalled_at << " vs. " << cutoff << dendl;
+          dout(20) << "  within timeout " << recalled_at_span << " vs. " << mds_recall_state_timeout << dendl;
         }
       }
       if ((session->get_num_trim_requests_warnings() > 0 &&
@@ -459,8 +455,8 @@ void Beacon::notify_health(MDSRank const *mds)
   // Detect MDS_HEALTH_SLOW_REQUEST condition
   {
     int slow = mds->get_mds_slow_req_count();
-    dout(20) << slow << " slow request found" << dendl;
     if (slow) {
+      dout(20) << slow << " slow request found" << dendl;
       std::ostringstream oss;
       oss << slow << " slow requests are blocked > " << g_conf->mds_op_complaint_time << " sec";
 
@@ -469,6 +465,26 @@ void Beacon::notify_health(MDSRank const *mds)
     }
   }
 
+  {
+    auto complaint_time = g_conf->osd_op_complaint_time;
+    auto now = clock::now();
+    auto cutoff = now - ceph::make_timespan(complaint_time);
+
+    std::string count;
+    ceph::coarse_mono_time oldest;
+    if (MDSIOContextBase::check_ios_in_flight(cutoff, count, oldest)) {
+      dout(20) << count << " slow metadata IOs found" << dendl;
+
+      auto oldest_secs = std::chrono::duration<double>(now - oldest).count();
+      std::ostringstream oss;
+      oss << count << " slow metadata IOs are blocked > " << complaint_time
+         << " secs, oldest blocked for " << (int64_t)oldest_secs << " secs";
+
+      MDSHealthMetric m(MDS_HEALTH_SLOW_METADATA_IO, HEALTH_WARN, oss.str());
+      health.metrics.push_back(m);
+    }
+  }
+
   // Report a health warning if we are readonly
   if (mds->mdcache->is_readonly()) {
     MDSHealthMetric m(MDS_HEALTH_READ_ONLY, HEALTH_WARN,
@@ -491,7 +507,7 @@ void Beacon::notify_health(MDSRank const *mds)
 
 MDSMap::DaemonState Beacon::get_want_state() const
 {
-  Mutex::Locker l(lock);
+  std::unique_lock<std::mutex> lock(mutex);
   return want_state;
 }