]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/PG.cc
import ceph quincy 17.2.1
[ceph.git] / ceph / src / osd / PG.cc
index df8c4f3451d4adce2a00aba69017727520e84b14..d77cef0d7b5c552b1156a2124b56dcea82c65a7d 100644 (file)
 #include "common/config.h"
 #include "OSD.h"
 #include "OpRequest.h"
-#include "ScrubStore.h"
-#include "pg_scrubber.h"
-#include "Session.h"
+#include "osd/scrubber/ScrubStore.h"
+#include "osd/scrubber/pg_scrubber.h"
 #include "osd/scheduler/OpSchedulerItem.h"
+#include "Session.h"
 
 #include "common/Timer.h"
 #include "common/perf_counters.h"
 
 #include "messages/MOSDOp.h"
-#include "messages/MOSDPGNotify.h"
-#include "messages/MOSDPGInfo.h"
 #include "messages/MOSDPGScan.h"
 #include "messages/MOSDPGBackfill.h"
 #include "messages/MOSDPGBackfillRemove.h"
@@ -138,7 +136,7 @@ uint64_t PG::get_with_id()
   ref++;
   std::lock_guard l(_ref_id_lock);
   uint64_t id = ++_ref_id;
-  BackTrace bt(0);
+  ClibBackTrace bt(0);
   stringstream ss;
   bt.print(ss);
   lgeneric_subdout(cct, refs, 5) << "PG::get " << this << " " << info.pgid
@@ -203,11 +201,9 @@ PG::PG(OSDService *o, OSDMapRef curmap,
   info_struct_v(0),
   pgmeta_oid(p.make_pgmeta_oid()),
   stat_queue_item(this),
-  scrub_queued(false),
   recovery_queued(false),
   recovery_ops_active(0),
   backfill_reserving(false),
-  pg_stats_publish_valid(false),
   finish_sync_event(NULL),
   scrub_after_recovery(false),
   active_pushes(0),
@@ -436,19 +432,15 @@ void PG::queue_scrub_after_repair()
   m_planned_scrub.check_repair = true;
   m_planned_scrub.must_scrub = true;
 
-  if (is_scrubbing()) {
-    dout(10) << __func__ << ": scrubbing already" << dendl;
-    return;
-  }
-  if (scrub_queued) {
-    dout(10) << __func__ << ": already queued" << dendl;
+  if (is_scrub_queued_or_active()) {
+    dout(10) << __func__ << ": scrubbing already ("
+             << (is_scrubbing() ? "active)" : "queued)") << dendl;
     return;
   }
 
   m_scrubber->set_op_parameters(m_planned_scrub);
   dout(15) << __func__ << ": queueing" << dendl;
 
-  scrub_queued = true;
   osd->queue_scrub_after_repair(this, Scrub::scrub_prio_t::high_priority);
 }
 
@@ -636,8 +628,8 @@ void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
       auto q = p->second.begin();
       while (q != p->second.end()) {
        dout(20) << __func__ << " checking  " << *q << dendl;
-       int r = cmp((*q)->begin, begin);
-       if (r == 0 || (r > 0 && (*q)->end < end)) {
+       int rr = cmp((*q)->begin, begin);
+       if (rr == 0 || (rr > 0 && (*q)->end < end)) {
          bv.push_back(*q);
          q = p->second.erase(q);
        } else {
@@ -763,7 +755,7 @@ void PG::set_probe_targets(const set<pg_shard_t> &probe_set)
 
 void PG::send_cluster_message(
   int target, MessageRef m,
-  epoch_t epoch, bool share_map_update=false)
+  epoch_t epoch, bool share_map_update)
 {
   ConnectionRef con = osd->get_con_osd_cluster(
     target, get_osdmap_epoch());
@@ -821,14 +813,18 @@ void PG::publish_stats_to_osd()
   if (!is_primary())
     return;
 
+  ceph_assert(m_scrubber);
+  recovery_state.update_stats_wo_resched(
+    [scrubber = m_scrubber.get()](pg_history_t& hist,
+                                  pg_stat_t& info) mutable -> void {
+      info.scrub_sched_status = scrubber->get_schedule();
+    });
+
   std::lock_guard l{pg_stats_publish_lock};
-  auto stats = recovery_state.prepare_stats_for_publish(
-    pg_stats_publish_valid,
-    pg_stats_publish,
-    unstable_stats);
+  auto stats =
+    recovery_state.prepare_stats_for_publish(pg_stats_publish, unstable_stats);
   if (stats) {
-    pg_stats_publish = stats.value();
-    pg_stats_publish_valid = true;
+    pg_stats_publish = std::move(stats);
   }
 }
 
@@ -841,7 +837,7 @@ void PG::clear_publish_stats()
 {
   dout(15) << "clear_stats" << dendl;
   std::lock_guard l{pg_stats_publish_lock};
-  pg_stats_publish_valid = false;
+  pg_stats_publish.reset();
 }
 
 /**
@@ -864,12 +860,11 @@ void PG::init(
   const vector<int>& newacting, int new_acting_primary,
   const pg_history_t& history,
   const PastIntervals& pi,
-  bool backfill,
   ObjectStore::Transaction &t)
 {
   recovery_state.init(
     role, newup, new_up_primary, newacting,
-    new_acting_primary, history, pi, backfill, t);
+    new_acting_primary, history, pi, t);
 }
 
 void PG::shutdown()
@@ -990,7 +985,6 @@ int PG::peek_map_epoch(ObjectStore *store,
                       epoch_t *pepoch)
 {
   coll_t coll(pgid);
-  ghobject_t legacy_infos_oid(OSD::make_infos_oid());
   ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
   epoch_t cur_epoch = 0;
 
@@ -1151,7 +1145,7 @@ void PG::read_state(ObjectStore *store)
   // init pool options
   store->set_collection_opts(ch, pool.info.opts);
 
-  PeeringCtx rctx(ceph_release_t::unknown);
+  PeeringCtx rctx;
   handle_initialize(rctx);
   // note: we don't activate here because we know the OSD will advance maps
   // during boot.
@@ -1330,23 +1324,20 @@ unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsig
  *  Unless failing to start scrubbing, the 'planned scrub' flag-set is 'frozen' into
  *  PgScrubber's m_flags, then cleared.
  */
-bool PG::sched_scrub()
+Scrub::schedule_result_t PG::sched_scrub()
 {
   dout(15) << __func__ << " pg(" << info.pgid
          << (is_active() ? ") <active>" : ") <not-active>")
          << (is_clean() ? " <clean>" : " <not-clean>") << dendl;
   ceph_assert(ceph_mutex_is_locked(_lock));
-  ceph_assert(!is_scrubbing());
+  ceph_assert(m_scrubber);
 
-  if (!is_primary() || !is_active() || !is_clean()) {
-    return false;
+  if (is_scrub_queued_or_active()) {
+    return Scrub::schedule_result_t::already_started;
   }
 
-  if (scrub_queued) {
-    // only applicable to the very first time a scrub event is queued
-    // (until handled and posted to the scrub FSM)
-    dout(10) << __func__ << ": already queued" << dendl;
-    return false;
+  if (!is_primary() || !is_active() || !is_clean()) {
+    return Scrub::schedule_result_t::bad_pg_state;
   }
 
   // analyse the combination of the requested scrub flags, the osd/pool configuration
@@ -1358,14 +1349,14 @@ bool PG::sched_scrub()
     // (due to configuration or priority issues)
     // The reason was already reported by the callee.
     dout(10) << __func__ << ": failed to initiate a scrub" << dendl;
-    return false;
+    return Scrub::schedule_result_t::preconditions;
   }
 
   // try to reserve the local OSD resources. If failing: no harm. We will
   // be retried by the OSD later on.
   if (!m_scrubber->reserve_local()) {
     dout(10) << __func__ << ": failed to reserve locally" << dendl;
-    return false;
+    return Scrub::schedule_result_t::no_local_resources;
   }
 
   // can commit to the updated flags now, as nothing will stop the scrub
@@ -1379,10 +1370,8 @@ bool PG::sched_scrub()
   m_scrubber->set_op_parameters(m_planned_scrub);
 
   dout(10) << __func__ << ": queueing" << dendl;
-
-  scrub_queued = true;
   osd->queue_for_scrub(this, Scrub::scrub_prio_t::low_priority);
-  return true;
+  return Scrub::schedule_result_t::scrub_initiated;
 }
 
 double PG::next_deepscrub_interval() const
@@ -1399,7 +1388,8 @@ bool PG::is_time_for_deep(bool allow_deep_scrub,
                          bool has_deep_errors,
                          const requested_scrub_t& planned) const
 {
-  dout(10) << __func__ << ": need_auto?" << planned.need_auto << " allow_deep_scrub? " << allow_deep_scrub << dendl;
+  dout(10) << __func__ << ": need_auto?" << planned.need_auto << " allow_deep_scrub? "
+          << allow_deep_scrub << dendl;
 
   if (!allow_deep_scrub)
     return false;
@@ -1409,8 +1399,11 @@ bool PG::is_time_for_deep(bool allow_deep_scrub,
     return true;
   }
 
-  if (ceph_clock_now() >= next_deepscrub_interval())
+  if (ceph_clock_now() >= next_deepscrub_interval()) {
+    dout(20) << __func__ << ": now (" << ceph_clock_now() << ") >= time for deep ("
+            << next_deepscrub_interval() << ")" << dendl;
     return true;
+  }
 
   if (has_deep_errors) {
     osd->clog->info() << "osd." << osd->whoami << " pg " << info.pgid
@@ -1481,15 +1474,22 @@ bool PG::verify_periodic_scrub_mode(bool allow_deep_scrub,
 
 std::optional<requested_scrub_t> PG::verify_scrub_mode() const
 {
-  dout(10) << __func__ << " processing pg " << info.pgid << dendl;
-
-  bool allow_deep_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
-                           pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB));
-  bool allow_regular_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
-                              pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB));
-  bool has_deep_errors = (info.stats.stats.sum.num_deep_scrub_errors > 0);
-  bool try_to_auto_repair =
-    (cct->_conf->osd_scrub_auto_repair && get_pgbackend()->auto_repair_supported());
+  const bool allow_regular_scrub =
+    !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
+      pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB));
+  const bool allow_deep_scrub =
+    allow_regular_scrub &&
+    !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
+      pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB));
+  const bool has_deep_errors = (info.stats.stats.sum.num_deep_scrub_errors > 0);
+  const bool try_to_auto_repair = (cct->_conf->osd_scrub_auto_repair &&
+                                   get_pgbackend()->auto_repair_supported());
+
+  dout(10) << __func__ << " pg: " << info.pgid
+           << " allow: " << allow_regular_scrub << "/" << allow_deep_scrub
+           << " deep errs: " << has_deep_errors
+           << " auto-repair: " << try_to_auto_repair << " ("
+           << cct->_conf->osd_scrub_auto_repair << ")" << dendl;
 
   auto upd_flags = m_planned_scrub;
 
@@ -1500,18 +1500,21 @@ std::optional<requested_scrub_t> PG::verify_scrub_mode() const
   upd_flags.auto_repair = false;
 
   if (upd_flags.must_scrub && !upd_flags.must_deep_scrub && has_deep_errors) {
-    osd->clog->error() << "osd." << osd->whoami << " pg " << info.pgid
-                      << " Regular scrub request, deep-scrub details will be lost";
+    osd->clog->error()
+      << "osd." << osd->whoami << " pg " << info.pgid
+      << " Regular scrub request, deep-scrub details will be lost";
   }
 
   if (!upd_flags.must_scrub) {
     // All periodic scrub handling goes here because must_scrub is
     // always set for must_deep_scrub and must_repair.
 
-    bool can_start_periodic =
-      verify_periodic_scrub_mode(allow_deep_scrub, try_to_auto_repair,
-                                allow_regular_scrub, has_deep_errors, upd_flags);
+    const bool can_start_periodic = verify_periodic_scrub_mode(
+      allow_deep_scrub, try_to_auto_repair, allow_regular_scrub,
+      has_deep_errors, upd_flags);
     if (!can_start_periodic) {
+      // "I don't want no scrub"
+      dout(20) << __func__ << ": no periodic scrubs allowed" << dendl;
       return std::nullopt;
     }
   }
@@ -1531,21 +1534,43 @@ std::optional<requested_scrub_t> PG::verify_scrub_mode() const
   return upd_flags;
 }
 
-void PG::reg_next_scrub()
+/*
+ * Note: on_info_history_change() is used in those two cases where we're not sure
+ * whether the role of the PG was changed, and if so - was this change relayed to the
+ * scrub-queue.
+ */
+void PG::on_info_history_change()
+{
+  dout(20) << __func__ << " for a " << (is_primary() ? "Primary" : "non-primary") <<dendl;
+
+  ceph_assert(m_scrubber);
+  m_scrubber->on_maybe_registration_change(m_planned_scrub);
+}
+
+void PG::reschedule_scrub()
 {
-  m_scrubber->reg_next_scrub(m_planned_scrub);
+  dout(20) << __func__ << " for a " << (is_primary() ? "Primary" : "non-primary") <<dendl;
+
+  // we are assuming no change in primary status
+  if (is_primary()) {
+    ceph_assert(m_scrubber);
+    m_scrubber->update_scrub_job(m_planned_scrub);
+  }
 }
 
-void PG::on_info_history_change()
+void PG::on_primary_status_change(bool was_primary, bool now_primary)
 {
-  if (m_scrubber) {
-    m_scrubber->unreg_next_scrub();
-    m_scrubber->reg_next_scrub(m_planned_scrub);
+  // make sure we have a working scrubber when becoming a primary
+
+  if (was_primary != now_primary) {
+    ceph_assert(m_scrubber);
+    m_scrubber->on_primary_change(m_planned_scrub);
   }
 }
 
 void PG::scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type)
 {
+  ceph_assert(m_scrubber);
   m_scrubber->scrub_requested(scrub_level, scrub_type, m_planned_scrub);
 }
 
@@ -1566,11 +1591,17 @@ void PG::on_role_change() {
   plpg_on_role_change();
 }
 
-void PG::on_new_interval() {
-  dout(20) << __func__ << " scrub_queued was " << scrub_queued << " flags: " << m_planned_scrub << dendl;
-  scrub_queued = false;
+void PG::on_new_interval()
+{
   projected_last_update = eversion_t();
   cancel_recovery();
+
+  assert(m_scrubber);
+  // log some scrub data before we react to the interval
+  dout(20) << __func__ << (is_scrub_queued_or_active() ? " scrubbing " : " ")
+           << "flags: " << m_planned_scrub << dendl;
+
+  m_scrubber->on_maybe_registration_change(m_planned_scrub);
 }
 
 epoch_t PG::oldest_stored_osdmap() {
@@ -2063,122 +2094,52 @@ void PG::repair_object(
   recovery_state.force_object_missing(bad_peers, soid, oi.version);
 }
 
-void PG::forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued)
+void PG::forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued, std::string_view desc)
 {
-  dout(20) << __func__ << " queued at: " << epoch_queued << dendl;
-  if (is_active() && m_scrubber) {
+  dout(20) << __func__ << ": " << desc << " queued at: " << epoch_queued << dendl;
+  ceph_assert(m_scrubber);
+  if (is_active()) {
     ((*m_scrubber).*fn)(epoch_queued);
   } else {
     // pg might be in the process of being deleted
     dout(5) << __func__ << " refusing to forward. " << (is_clean() ? "(clean) " : "(not clean) ") <<
-            (is_active() ? "(active) " : "(not active) ") <<  dendl;
+             (is_active() ? "(active) " : "(not active) ") <<  dendl;
   }
 }
 
-void PG::replica_scrub(OpRequestRef op, ThreadPool::TPHandle& handle)
+void PG::forward_scrub_event(ScrubSafeAPI fn,
+                            epoch_t epoch_queued,
+                            Scrub::act_token_t act_token,
+                            std::string_view desc)
 {
-  dout(10) << __func__ << " (op)" << dendl;
-  if (m_scrubber)
-    m_scrubber->replica_scrub_op(op);
-}
-
-void PG::scrub(epoch_t epoch_queued, ThreadPool::TPHandle& handle)
-{
-  dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
-  // a new scrub
-  scrub_queued = false;
-  forward_scrub_event(&ScrubPgIF::initiate_regular_scrub, epoch_queued);
+  dout(20) << __func__ << ": " << desc << " queued: " << epoch_queued
+          << " token: " << act_token << dendl;
+  ceph_assert(m_scrubber);
+  if (is_active()) {
+    ((*m_scrubber).*fn)(epoch_queued, act_token);
+  } else {
+    // pg might be in the process of being deleted
+    dout(5) << __func__ << " refusing to forward. "
+           << (is_clean() ? "(clean) " : "(not clean) ")
+           << (is_active() ? "(active) " : "(not active) ") << dendl;
+  }
 }
 
-// note: no need to secure OSD resources for a recovery scrub
-void PG::recovery_scrub(epoch_t epoch_queued,
-                        [[maybe_unused]] ThreadPool::TPHandle& handle)
+void PG::replica_scrub(OpRequestRef op, ThreadPool::TPHandle& handle)
 {
-  dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
-  // a new scrub
-  scrub_queued = false;
-  forward_scrub_event(&ScrubPgIF::initiate_scrub_after_repair, epoch_queued);
+  dout(10) << __func__ << " (op)" << dendl;
+  ceph_assert(m_scrubber);
+  m_scrubber->replica_scrub_op(op);
 }
 
 void PG::replica_scrub(epoch_t epoch_queued,
+                      Scrub::act_token_t act_token,
                       [[maybe_unused]] ThreadPool::TPHandle& handle)
 {
   dout(10) << __func__ << " queued at: " << epoch_queued
           << (is_primary() ? " (primary)" : " (replica)") << dendl;
-  scrub_queued = false;
-  forward_scrub_event(&ScrubPgIF::send_start_replica, epoch_queued);
-}
-
-void PG::scrub_send_scrub_resched(epoch_t epoch_queued,
-                                 [[maybe_unused]] ThreadPool::TPHandle& handle)
-{
-  dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
-  scrub_queued = false;
-  forward_scrub_event(&ScrubPgIF::send_scrub_resched, epoch_queued);
-}
-
-void PG::scrub_send_resources_granted(epoch_t epoch_queued,
-                                     [[maybe_unused]] ThreadPool::TPHandle& handle)
-{
-  dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
-  forward_scrub_event(&ScrubPgIF::send_remotes_reserved, epoch_queued);
-}
-
-void PG::scrub_send_resources_denied(epoch_t epoch_queued,
-                                    [[maybe_unused]] ThreadPool::TPHandle& handle)
-{
-  dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
-  forward_scrub_event(&ScrubPgIF::send_reservation_failure, epoch_queued);
-}
-
-void PG::replica_scrub_resched(epoch_t epoch_queued,
-                              [[maybe_unused]] ThreadPool::TPHandle& handle)
-{
-  dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
-  scrub_queued = false;
-  forward_scrub_event(&ScrubPgIF::send_sched_replica, epoch_queued);
-}
-
-void PG::scrub_send_pushes_update(epoch_t epoch_queued,
-                                 [[maybe_unused]] ThreadPool::TPHandle& handle)
-{
-  dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
-  forward_scrub_event(&ScrubPgIF::active_pushes_notification, epoch_queued);
-}
-
-void PG::scrub_send_replica_pushes(epoch_t epoch_queued,
-                                  [[maybe_unused]] ThreadPool::TPHandle& handle)
-{
-  dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
-  forward_scrub_event(&ScrubPgIF::send_replica_pushes_upd, epoch_queued);
-}
-
-void PG::scrub_send_applied_update(epoch_t epoch_queued,
-                                  [[maybe_unused]] ThreadPool::TPHandle& handle)
-{
-  dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
-  forward_scrub_event(&ScrubPgIF::update_applied_notification, epoch_queued);
-}
-
-void PG::scrub_send_unblocking(epoch_t epoch_queued,
-                              [[maybe_unused]] ThreadPool::TPHandle& handle)
-{
-  dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
-  forward_scrub_event(&ScrubPgIF::send_scrub_unblock, epoch_queued);
-}
-
-void PG::scrub_send_digest_update(epoch_t epoch_queued,
-                                 [[maybe_unused]] ThreadPool::TPHandle& handle)
-{
-  dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
-  forward_scrub_event(&ScrubPgIF::digest_update_notification, epoch_queued);
-}
-
-void PG::scrub_send_replmaps_ready(epoch_t epoch_queued,
-                                  [[maybe_unused]] ThreadPool::TPHandle& handle)
-{
-  dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
-  forward_scrub_event(&ScrubPgIF::send_replica_maps_ready, epoch_queued);
+  forward_scrub_event(&ScrubPgIF::send_start_replica, epoch_queued, act_token,
+                     "StartReplica/nw");
 }
 
 bool PG::ops_blocked_by_scrub() const
@@ -2581,12 +2542,6 @@ void PG::handle_query_state(Formatter *f)
   dout(10) << "handle_query_state" << dendl;
   PeeringState::QueryState q(f);
   recovery_state.handle_event(q, 0);
-
-  // This code has moved to after the close of recovery_state array.
-  // I don't think that scrub is a recovery state
-  if (is_primary() && is_active() && m_scrubber && m_scrubber->is_scrub_active()) {
-    m_scrubber->handle_query_state(f);
-  }
 }
 
 void PG::init_collection_pool_opts()
@@ -2625,7 +2580,7 @@ std::pair<ghobject_t, bool> PG::do_delete_work(
       epoch_t e = get_osdmap()->get_epoch();
       PGRef pgref(this);
       auto delete_requeue_callback = new LambdaContext([this, pgref, e](int r) {
-        dout(20) << __func__ << " wake up at "
+        dout(20) << "do_delete_work() [cb] wake up at "
                  << ceph_clock_now()
                 << ", re-queuing delete" << dendl;
         std::scoped_lock locker{*this};
@@ -2673,10 +2628,14 @@ std::pair<ghobject_t, bool> PG::do_delete_work(
       max,
       &olist,
       &next);
-    if (!olist.empty()) {
-      dout(0) << __func__ << " additional unexpected onode list"
-              <<" (new onodes has appeared since PG removal started"
-              << olist << dendl;
+    for (auto& oid : olist) {
+      if (oid == pgmeta_oid) {
+        dout(20) << __func__ << " removing pgmeta object " << oid << dendl;
+      } else {
+        dout(0) << __func__ << " additional unexpected onode"
+                <<" new onode has appeared since PG removal started"
+                << oid << dendl;
+      }
     }
   }
 
@@ -2794,15 +2753,15 @@ void PG::dump_missing(Formatter *f)
   }
 }
 
-void PG::get_pg_stats(std::function<void(const pg_stat_t&, epoch_t lec)> f)
+void PG::with_pg_stats(std::function<void(const pg_stat_t&, epoch_t lec)>&& f)
 {
   std::lock_guard l{pg_stats_publish_lock};
-  if (pg_stats_publish_valid) {
-    f(pg_stats_publish, pg_stats_publish.get_effective_last_epoch_clean());
+  if (pg_stats_publish) {
+    f(*pg_stats_publish, pg_stats_publish->get_effective_last_epoch_clean());
   }
 }
 
-void PG::with_heartbeat_peers(std::function<void(int)> f)
+void PG::with_heartbeat_peers(std::function<void(int)>&& f)
 {
   std::lock_guard l{heartbeat_peer_lock};
   for (auto p : heartbeat_peers) {