#include "Beacon.h"
+#include <chrono>
+
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_mds
#undef dout_prefix
#define dout_prefix *_dout << "mds.beacon." << name << ' '
-
-Beacon::Beacon(CephContext *cct_, MonClient *monc_, boost::string_view name_) :
- Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock),
- name(name_), standby_for_rank(MDS_RANK_NONE),
- standby_for_fscid(FS_CLUSTER_ID_NONE), want_state(MDSMap::STATE_BOOT),
- awaiting_seq(-1)
+Beacon::Beacon(CephContext *cct, MonClient *monc, boost::string_view name)
+ :
+ Dispatcher(cct),
+ beacon_interval(g_conf->mds_beacon_interval),
+ monc(monc),
+ name(name)
{
- last_seq = 0;
- was_laggy = false;
-
- epoch = 0;
}
-
Beacon::~Beacon()
{
+ shutdown();
}
+void Beacon::shutdown()
+{
+ std::unique_lock<std::mutex> lock(mutex);
+ if (!finished) {
+ finished = true;
+ lock.unlock();
+ sender.join();
+ }
+}
-void Beacon::init(MDSMap const *mdsmap)
+void Beacon::init(const MDSMap* mdsmap)
{
- Mutex::Locker l(lock);
+ std::unique_lock<std::mutex> lock(mutex);
assert(mdsmap != NULL);
_notify_mdsmap(mdsmap);
standby_for_fscid = fs_cluster_id_t(g_conf->mds_standby_for_fscid);
standby_replay = g_conf->mds_standby_replay;
- // Spawn threads and start messaging
- timer.init();
- _send();
+ sender = std::thread([this]() {
+ std::unique_lock<std::mutex> lock(mutex);
+ std::condition_variable c; // no one wakes us
+ while (!finished) {
+ auto now = clock::now();
+ auto since = std::chrono::duration<double>(now-last_send).count();
+ auto interval = beacon_interval;
+ if (since >= interval*.90) {
+ _send();
+ } else {
+ interval -= since;
+ }
+ dout(20) << "sender thread waiting interval " << interval << "s" << dendl;
+ c.wait_for(lock, interval*std::chrono::seconds(1));
+ }
+ });
}
-
-void Beacon::shutdown()
+bool Beacon::ms_can_fast_dispatch(const Message *m) const
{
- Mutex::Locker l(lock);
- if (sender) {
- timer.cancel_event(sender);
- sender = NULL;
- }
- timer.shutdown();
+ return m->get_type() == MSG_MDS_BEACON;
}
+void Beacon::ms_fast_dispatch(Message *m)
+{
+ bool handled = ms_dispatch(m);
+ assert(handled);
+}
bool Beacon::ms_dispatch(Message *m)
{
if (m->get_type() == MSG_MDS_BEACON) {
if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
handle_mds_beacon(static_cast<MMDSBeacon*>(m));
+ } else {
+ m->put();
}
return true;
}
*/
void Beacon::handle_mds_beacon(MMDSBeacon *m)
{
- Mutex::Locker l(lock);
+ std::unique_lock<std::mutex> lock(mutex);
assert(m != NULL);
version_t seq = m->get_seq();
// update lab
- if (seq_stamp.count(seq)) {
- utime_t now = ceph_clock_now();
- if (seq_stamp[seq] > last_acked_stamp) {
- last_acked_stamp = seq_stamp[seq];
- utime_t rtt = now - last_acked_stamp;
-
- dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
- << " seq " << m->get_seq() << " rtt " << rtt << dendl;
-
- if (was_laggy && rtt < g_conf->mds_beacon_grace) {
- dout(0) << "handle_mds_beacon no longer laggy" << dendl;
- was_laggy = false;
- laggy_until = now;
- }
- } else {
- // Mark myself laggy if system clock goes backwards. Hopping
- // later beacons will clear it.
- dout(1) << "handle_mds_beacon system clock goes backwards, "
- << "mark myself laggy" << dendl;
- last_acked_stamp = now - utime_t(g_conf->mds_beacon_grace + 1, 0);
- was_laggy = true;
+ auto it = seq_stamp.find(seq);
+ if (it != seq_stamp.end()) {
+ auto now = clock::now();
+
+ last_acked_stamp = it->second;
+ auto rtt = std::chrono::duration<double>(now - last_acked_stamp).count();
+
+ dout(5) << "received beacon reply " << ceph_mds_state_name(m->get_state()) << " seq " << m->get_seq() << " rtt " << rtt << dendl;
+
+ if (laggy && rtt < g_conf->mds_beacon_grace) {
+ dout(0) << " MDS is no longer laggy" << dendl;
+ laggy = false;
+ last_laggy = now;
}
// clean up seq_stamp map
- while (!seq_stamp.empty() &&
- seq_stamp.begin()->first <= seq)
- seq_stamp.erase(seq_stamp.begin());
+ seq_stamp.erase(seq_stamp.begin(), ++it);
// Wake a waiter up if present
- if (awaiting_seq == seq) {
- waiting_cond.Signal();
- }
+ cvar.notify_all();
} else {
- dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
- << " seq " << m->get_seq() << " dne" << dendl;
+ dout(1) << "discarding unexpected beacon reply " << ceph_mds_state_name(m->get_state())
+ << " seq " << m->get_seq() << " dne" << dendl;
}
m->put();
}
void Beacon::send()
{
- Mutex::Locker l(lock);
+ std::unique_lock<std::mutex> lock(mutex);
_send();
}
void Beacon::send_and_wait(const double duration)
{
- Mutex::Locker l(lock);
+ std::unique_lock<std::mutex> lock(mutex);
_send();
- awaiting_seq = last_seq;
+ auto awaiting_seq = last_seq;
dout(20) << __func__ << ": awaiting " << awaiting_seq
<< " for up to " << duration << "s" << dendl;
- utime_t timeout;
- timeout.set_from_double(ceph_clock_now() + duration);
- while ((!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq)
- && ceph_clock_now() < timeout) {
- waiting_cond.WaitUntil(lock, timeout);
+ auto start = clock::now();
+ while (!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq) {
+ auto now = clock::now();
+ auto s = duration*.95-std::chrono::duration<double>(now-start).count();
+ if (s < 0) break;
+ cvar.wait_for(lock, s*std::chrono::seconds(1));
}
-
- awaiting_seq = -1;
}
*/
void Beacon::_send()
{
- if (sender) {
- timer.cancel_event(sender);
- }
- sender = timer.add_event_after(
- g_conf->mds_beacon_interval,
- new FunctionContext([this](int) {
- assert(lock.is_locked_by_me());
- sender = nullptr;
- _send();
- }));
+ auto now = clock::now();
+ auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
if (!cct->get_heartbeat_map()->is_healthy()) {
/* If anything isn't progressing, let avoid sending a beacon so that
* the MDS will consider us laggy */
- dout(1) << __func__ << " skipping beacon, heartbeat map not healthy" << dendl;
+ dout(0) << "Skipping beacon heartbeat to monitors (last acked " << since << "s ago); MDS internal heartbeat is not healthy!" << dendl;
return;
}
++last_seq;
- dout(10) << __func__ << " " << ceph_mds_state_name(want_state)
- << " seq " << last_seq
- << dendl;
+ dout(5) << "Sending beacon " << ceph_mds_state_name(want_state) << " seq " << last_seq << dendl;
- seq_stamp[last_seq] = ceph_clock_now();
+ seq_stamp[last_seq] = now;
assert(want_state != MDSMap::STATE_NULL);
beacon->set_sys_info(sys_info);
}
monc->send_mon_message(beacon);
+ last_send = now;
}
/**
*/
void Beacon::notify_mdsmap(MDSMap const *mdsmap)
{
- Mutex::Locker l(lock);
+ std::unique_lock<std::mutex> lock(mutex);
assert(mdsmap != NULL);
_notify_mdsmap(mdsmap);
bool Beacon::is_laggy()
{
- Mutex::Locker l(lock);
-
- if (last_acked_stamp == utime_t())
- return false;
+ std::unique_lock<std::mutex> lock(mutex);
- utime_t now = ceph_clock_now();
- utime_t since = now - last_acked_stamp;
+ auto now = clock::now();
+ auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
if (since > g_conf->mds_beacon_grace) {
- dout(5) << "is_laggy " << since << " > " << g_conf->mds_beacon_grace
- << " since last acked beacon" << dendl;
- was_laggy = true;
- if (since > (g_conf->mds_beacon_grace*2) &&
- now > last_mon_reconnect + g_conf->mds_beacon_interval) {
+ if (!laggy) {
+ dout(1) << "is_laggy " << since << " > " << g_conf->mds_beacon_grace
+ << " since last acked beacon" << dendl;
+ }
+ laggy = true;
+ auto last_reconnect = std::chrono::duration<double>(now-last_mon_reconnect).count();
+ if (since > (g_conf->mds_beacon_grace*2) && last_reconnect > g_conf->mds_beacon_interval) {
// maybe it's not us?
- dout(5) << "initiating monitor reconnect; maybe we're not the slow one"
+ dout(1) << "initiating monitor reconnect; maybe we're not the slow one"
<< dendl;
last_mon_reconnect = now;
monc->reopen_session();
return false;
}
-utime_t Beacon::get_laggy_until() const
-{
- Mutex::Locker l(lock);
-
- return laggy_until;
-}
-
-void Beacon::set_want_state(MDSMap const *mdsmap, MDSMap::DaemonState const newstate)
+void Beacon::set_want_state(const MDSMap* mdsmap, MDSMap::DaemonState const newstate)
{
- Mutex::Locker l(lock);
+ std::unique_lock<std::mutex> lock(mutex);
// Update mdsmap epoch atomically with updating want_state, so that when
// we send a beacon with the new want state it has the latest epoch, and
_notify_mdsmap(mdsmap);
if (want_state != newstate) {
- dout(10) << __func__ << ": "
+ dout(5) << __func__ << ": "
<< ceph_mds_state_name(want_state) << " -> "
<< ceph_mds_state_name(newstate) << dendl;
want_state = newstate;
*/
void Beacon::notify_health(MDSRank const *mds)
{
- Mutex::Locker l(lock);
+ std::unique_lock<std::mutex> lock(mutex);
if (!mds) {
// No MDS rank held
return;
// CLIENT_CAPS messages.
{
std::list<client_t> late_clients;
- mds->locker->get_late_revoking_clients(&late_clients);
+ mds->locker->get_late_revoking_clients(&late_clients,
+ mds->mdsmap->get_session_timeout());
std::list<MDSHealthMetric> late_cap_metrics;
for (std::list<client_t>::iterator i = late_clients.begin(); i != late_clients.end(); ++i) {
set<Session*> sessions;
mds->sessionmap.get_client_session_set(sessions);
- utime_t cutoff = ceph_clock_now();
- cutoff -= g_conf->mds_recall_state_timeout;
- utime_t last_recall = mds->mdcache->last_recall_state;
+ auto mds_recall_state_timeout = g_conf->mds_recall_state_timeout;
+ auto last_recall = mds->mdcache->last_recall_state;
+ auto last_recall_span = std::chrono::duration<double>(clock::now()-last_recall).count();
+ bool recall_state_timedout = last_recall_span > mds_recall_state_timeout;
std::list<MDSHealthMetric> late_recall_metrics;
std::list<MDSHealthMetric> large_completed_requests_metrics;
- for (set<Session*>::iterator i = sessions.begin(); i != sessions.end(); ++i) {
- Session *session = *i;
- if (!session->recalled_at.is_zero()) {
+ for (auto& session : sessions) {
+ if (session->recalled_at != Session::time::min()) {
+ auto last_recall_sent = session->last_recall_sent;
+ auto recalled_at = session->recalled_at;
+ auto recalled_at_span = std::chrono::duration<double>(clock::now()-recalled_at).count();
+
dout(20) << "Session servicing RECALL " << session->info.inst
- << ": " << session->recalled_at << " " << session->recall_release_count
+ << ": " << recalled_at_span << "s ago " << session->recall_release_count
<< "/" << session->recall_count << dendl;
- if (last_recall < cutoff || session->last_recall_sent < last_recall) {
+ if (recall_state_timedout || last_recall_sent < last_recall) {
dout(20) << " no longer recall" << dendl;
session->clear_recalled_at();
- } else if (session->recalled_at < cutoff) {
- dout(20) << " exceeded timeout " << session->recalled_at << " vs. " << cutoff << dendl;
+ } else if (recalled_at_span > mds_recall_state_timeout) {
+ dout(20) << " exceeded timeout " << recalled_at_span << " vs. " << mds_recall_state_timeout << dendl;
std::ostringstream oss;
oss << "Client " << session->get_human_name() << " failing to respond to cache pressure";
MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL, HEALTH_WARN, oss.str());
m.metadata["client_id"] = stringify(session->info.inst.name.num());
late_recall_metrics.push_back(m);
} else {
- dout(20) << " within timeout " << session->recalled_at << " vs. " << cutoff << dendl;
+ dout(20) << " within timeout " << recalled_at_span << " vs. " << mds_recall_state_timeout << dendl;
}
}
if ((session->get_num_trim_requests_warnings() > 0 &&
// Detect MDS_HEALTH_SLOW_REQUEST condition
{
int slow = mds->get_mds_slow_req_count();
- dout(20) << slow << " slow request found" << dendl;
if (slow) {
+ dout(20) << slow << " slow request found" << dendl;
std::ostringstream oss;
oss << slow << " slow requests are blocked > " << g_conf->mds_op_complaint_time << " sec";
}
}
+ {
+ auto complaint_time = g_conf->osd_op_complaint_time;
+ auto now = clock::now();
+ auto cutoff = now - ceph::make_timespan(complaint_time);
+
+ std::string count;
+ ceph::coarse_mono_time oldest;
+ if (MDSIOContextBase::check_ios_in_flight(cutoff, count, oldest)) {
+ dout(20) << count << " slow metadata IOs found" << dendl;
+
+ auto oldest_secs = std::chrono::duration<double>(now - oldest).count();
+ std::ostringstream oss;
+ oss << count << " slow metadata IOs are blocked > " << complaint_time
+ << " secs, oldest blocked for " << (int64_t)oldest_secs << " secs";
+
+ MDSHealthMetric m(MDS_HEALTH_SLOW_METADATA_IO, HEALTH_WARN, oss.str());
+ health.metrics.push_back(m);
+ }
+ }
+
// Report a health warning if we are readonly
if (mds->mdcache->is_readonly()) {
MDSHealthMetric m(MDS_HEALTH_READ_ONLY, HEALTH_WARN,
MDSMap::DaemonState Beacon::get_want_state() const
{
- Mutex::Locker l(lock);
+ std::unique_lock<std::mutex> lock(mutex);
return want_state;
}