class PG_RecoveryQueueAsync : public Context {
PGBackend::Listener *pg;
unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
+ uint64_t cost;
public:
PG_RecoveryQueueAsync(
PGBackend::Listener *pg,
- GenContext<ThreadPool::TPHandle&> *c) : pg(pg), c(c) {}
+ GenContext<ThreadPool::TPHandle&> *c,
+ uint64_t cost) : pg(pg), c(c), cost(cost) {}
void finish(int) override {
- pg->schedule_recovery_work(c.release());
+ pg->schedule_recovery_work(c.release(), cost);
}
};
}
bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op)
{
- dout(10) << __func__ << ": " << op << dendl;
+ dout(10) << __func__ << ": " << *op->get_req() << dendl;
switch (op->get_req()->get_type()) {
case MSG_OSD_PG_PULL:
return true;
OpRequestRef op
)
{
- dout(10) << __func__ << ": " << op << dendl;
+ dout(10) << __func__ << ": " << *op->get_req() << dendl;
switch (op->get_req()->get_type()) {
case MSG_OSD_PG_PUSH:
do_push(op);
ceph_assert(insert_res.second);
InProgressOp &op = *insert_res.first->second;
- jspan span;
- if (orig_op) {
- span = tracing::osd::tracer.add_span("ReplicatedBackend::submit_transaction", orig_op->osd_parent_span);
- }
op.waiting_for_commit.insert(
parent->get_acting_recovery_backfill_shards().begin(),
pos.data_hash = bufferhash(-1);
}
+ const uint64_t stride = cct->_conf->osd_deep_scrub_stride;
+
bufferlist bl;
r = store->read(
ch,
ghobject_t(
poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
pos.data_pos,
- cct->_conf->osd_deep_scrub_stride, bl,
+ stride, bl,
fadvise_flags);
if (r < 0) {
dout(20) << __func__ << " " << poid << " got "
pos.data_hash << bl;
}
pos.data_pos += r;
- if (static_cast<uint64_t>(r) == cct->_conf->osd_deep_scrub_stride) {
+ if (static_cast<uint64_t>(r) == stride) {
dout(20) << __func__ << " " << poid << " more data, digest so far 0x"
<< std::hex << pos.data_hash.digest() << std::dec << dendl;
return -EINPROGRESS;
ReplicatedBackend *bc;
list<ReplicatedBackend::pull_complete_info> to_continue;
int priority;
- C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
- : bc(bc), priority(priority) {}
+ C_ReplicatedBackend_OnPullComplete(
+ ReplicatedBackend *bc,
+ int priority,
+ list<ReplicatedBackend::pull_complete_info> &&to_continue)
+ : bc(bc), to_continue(std::move(to_continue)), priority(priority) {}
void finish(ThreadPool::TPHandle &handle) override {
ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
}
bc->run_recovery_op(h, priority);
}
+
+ /// Estimate total data reads required to perform pushes
+ uint64_t estimate_push_costs() const {
+ uint64_t cost = 0;
+ for (const auto &i: to_continue) {
+ cost += i.stat.num_bytes_recovered;
+ }
+ return cost;
+ }
};
void ReplicatedBackend::_do_pull_response(OpRequestRef op)
C_ReplicatedBackend_OnPullComplete *c =
new C_ReplicatedBackend_OnPullComplete(
this,
- m->get_priority());
- c->to_continue.swap(to_continue);
+ m->get_priority(),
+ std::move(to_continue));
t.register_on_complete(
new PG_RecoveryQueueAsync(
get_parent(),
- get_parent()->bless_unlocked_gencontext(c)));
+ get_parent()->bless_unlocked_gencontext(c),
+ std::max<uint64_t>(1, c->estimate_push_costs())));
}
replies.erase(replies.end() - 1);
<< " " << m->logbl.length()
<< dendl;
- jspan span;
- if (op) {
- span = tracing::osd::tracer.add_span(__func__, op->osd_parent_span);
- }
// sanity checks
ceph_assert(m->map_epoch >= get_info().history.same_interval_since);