]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/ReplicatedBackend.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / osd / ReplicatedBackend.cc
index 50a6c60d7f0f5f12ee350dd66b89332415ab7879..99535ed5feb97c2a4e0158975e597dd0d48553bf 100644 (file)
@@ -68,12 +68,14 @@ class PG_SendMessageOnConn: public Context {
 class PG_RecoveryQueueAsync : public Context {
   PGBackend::Listener *pg;
   unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
+  uint64_t cost;
   public:
   PG_RecoveryQueueAsync(
     PGBackend::Listener *pg,
-    GenContext<ThreadPool::TPHandle&> *c) : pg(pg), c(c) {}
+    GenContext<ThreadPool::TPHandle&> *c,
+    uint64_t cost) : pg(pg), c(c), cost(cost) {}
   void finish(int) override {
-    pg->schedule_recovery_work(c.release());
+    pg->schedule_recovery_work(c.release(), cost);
   }
 };
 }
@@ -190,7 +192,7 @@ void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap)
 
 bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op)
 {
-  dout(10) << __func__ << ": " << op << dendl;
+  dout(10) << __func__ << ": " << *op->get_req() << dendl;
   switch (op->get_req()->get_type()) {
   case MSG_OSD_PG_PULL:
     return true;
@@ -203,7 +205,7 @@ bool ReplicatedBackend::_handle_message(
   OpRequestRef op
   )
 {
-  dout(10) << __func__ << ": " << op << dendl;
+  dout(10) << __func__ << ": " << *op->get_req() << dendl;
   switch (op->get_req()->get_type()) {
   case MSG_OSD_PG_PUSH:
     do_push(op);
@@ -504,10 +506,6 @@ void ReplicatedBackend::submit_transaction(
   ceph_assert(insert_res.second);
   InProgressOp &op = *insert_res.first->second;
 
-  jspan span;
-  if (orig_op) {
-    span = tracing::osd::tracer.add_span("ReplicatedBackend::submit_transaction", orig_op->osd_parent_span);
-  }
 
   op.waiting_for_commit.insert(
     parent->get_acting_recovery_backfill_shards().begin(),
@@ -657,13 +655,15 @@ int ReplicatedBackend::be_deep_scrub(
       pos.data_hash = bufferhash(-1);
     }
 
+    const uint64_t stride = cct->_conf->osd_deep_scrub_stride;
+
     bufferlist bl;
     r = store->read(
       ch,
       ghobject_t(
        poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
       pos.data_pos,
-      cct->_conf->osd_deep_scrub_stride, bl,
+      stride, bl,
       fadvise_flags);
     if (r < 0) {
       dout(20) << __func__ << "  " << poid << " got "
@@ -675,7 +675,7 @@ int ReplicatedBackend::be_deep_scrub(
       pos.data_hash << bl;
     }
     pos.data_pos += r;
-    if (static_cast<uint64_t>(r) == cct->_conf->osd_deep_scrub_stride) {
+    if (static_cast<uint64_t>(r) == stride) {
       dout(20) << __func__ << "  " << poid << " more data, digest so far 0x"
               << std::hex << pos.data_hash.digest() << std::dec << dendl;
       return -EINPROGRESS;
@@ -819,8 +819,11 @@ struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
   ReplicatedBackend *bc;
   list<ReplicatedBackend::pull_complete_info> to_continue;
   int priority;
-  C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
-    : bc(bc), priority(priority) {}
+  C_ReplicatedBackend_OnPullComplete(
+    ReplicatedBackend *bc,
+    int priority,
+    list<ReplicatedBackend::pull_complete_info> &&to_continue)
+    : bc(bc), to_continue(std::move(to_continue)), priority(priority) {}
 
   void finish(ThreadPool::TPHandle &handle) override {
     ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
@@ -843,6 +846,15 @@ struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
     }
     bc->run_recovery_op(h, priority);
   }
+
+  /// Estimate total data reads required to perform pushes
+  uint64_t estimate_push_costs() const {
+    uint64_t cost = 0;
+    for (const auto &i: to_continue) {
+      cost += i.stat.num_bytes_recovered;
+    }
+    return cost;
+  }
 };
 
 void ReplicatedBackend::_do_pull_response(OpRequestRef op)
@@ -872,12 +884,13 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
     C_ReplicatedBackend_OnPullComplete *c =
       new C_ReplicatedBackend_OnPullComplete(
        this,
-       m->get_priority());
-    c->to_continue.swap(to_continue);
+       m->get_priority(),
+       std::move(to_continue));
     t.register_on_complete(
       new PG_RecoveryQueueAsync(
        get_parent(),
-       get_parent()->bless_unlocked_gencontext(c)));
+       get_parent()->bless_unlocked_gencontext(c),
+        std::max<uint64_t>(1, c->estimate_push_costs())));
   }
   replies.erase(replies.end() - 1);
 
@@ -1062,10 +1075,6 @@ void ReplicatedBackend::do_repop(OpRequestRef op)
           << " " << m->logbl.length()
           << dendl;
 
-  jspan span;
-  if (op) {
-    span = tracing::osd::tracer.add_span(__func__, op->osd_parent_span);
-  }
 
   // sanity checks
   ceph_assert(m->map_epoch >= get_info().history.same_interval_since);