return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags);
}
+int ReplicatedBackend::objects_readv_sync(
+ const hobject_t &hoid,
+ map<uint64_t, uint64_t>&& m,
+ uint32_t op_flags,
+ bufferlist *bl)
+{
+ interval_set<uint64_t> im(std::move(m));
+ auto r = store->readv(ch, ghobject_t(hoid), im, *bl, op_flags);
+ if (r >= 0) {
+ m = std::move(im).detach();
+ }
+ return r;
+}
+
void ReplicatedBackend::objects_read_async(
const hobject_t &hoid,
const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
class C_OSD_OnOpCommit : public Context {
ReplicatedBackend *pg;
- ReplicatedBackend::InProgressOpRef op;
+ ceph::ref_t<ReplicatedBackend::InProgressOp> op;
public:
- C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
- : pg(pg), op(op) {}
+ C_OSD_OnOpCommit(ReplicatedBackend *pg, ceph::ref_t<ReplicatedBackend::InProgressOp> op)
+ : pg(pg), op(std::move(op)) {}
void finish(int) override {
pg->op_commit(op);
}
vector<pg_log_entry_t> &log_entries,
ObjectStore::Transaction *t,
set<hobject_t> *added,
- set<hobject_t> *removed)
+ set<hobject_t> *removed,
+ const ceph_release_t require_osd_release = ceph_release_t::unknown )
{
ceph_assert(t);
ceph_assert(added);
[&](const PGTransaction::ObjectOperation::Init::None &) {
},
[&](const PGTransaction::ObjectOperation::Init::Create &op) {
- t->touch(coll, goid);
+ if (require_osd_release >= ceph_release_t::octopus) {
+ t->create(coll, goid);
+ } else {
+ t->touch(coll, goid);
+ }
},
[&](const PGTransaction::ObjectOperation::Init::Clone &op) {
t->clone(
case UpdateType::Insert:
t->omap_setkeys(coll, goid, up.second);
break;
+ case UpdateType::RemoveRange:
+ t->omap_rmkeyrange(coll, goid, up.second);
+ break;
}
}
goid,
extent.get_off(),
extent.get_len(),
- op.buffer);
+ op.buffer,
+ op.fadvise_flags);
},
[&](const BufferUpdate::Zero &op) {
t->zero(
const eversion_t &at_version,
PGTransactionUPtr &&_t,
const eversion_t &trim_to,
- const eversion_t &roll_forward_to,
+ const eversion_t &min_last_complete_ondisk,
const vector<pg_log_entry_t> &_log_entries,
- boost::optional<pg_hit_set_history_t> &hset_history,
+ std::optional<pg_hit_set_history_t> &hset_history,
Context *on_all_commit,
ceph_tid_t tid,
osd_reqid_t reqid,
log_entries,
&op_t,
&added,
- &removed);
+ &removed,
+ get_osdmap()->require_osd_release);
ceph_assert(added.size() <= 1);
ceph_assert(removed.size() <= 1);
auto insert_res = in_progress_ops.insert(
make_pair(
tid,
- new InProgressOp(
+ ceph::make_ref<InProgressOp>(
tid, on_all_commit,
orig_op, at_version)
)
tid,
reqid,
trim_to,
- at_version,
+ min_last_complete_ondisk,
added.size() ? *(added.begin()) : hobject_t(),
removed.size() ? *(removed.begin()) : hobject_t(),
log_entries,
hset_history,
trim_to,
at_version,
+ min_last_complete_ondisk,
true,
op_t);
}
}
-void ReplicatedBackend::op_commit(
- InProgressOpRef& op)
+void ReplicatedBackend::op_commit(const ceph::ref_t<InProgressOp>& op)
{
if (op->on_commit == nullptr) {
// aborted
void ReplicatedBackend::do_repop_reply(OpRequestRef op)
{
static_cast<MOSDRepOpReply*>(op->get_nonconst_req())->finish_decode();
- const MOSDRepOpReply *r = static_cast<const MOSDRepOpReply *>(op->get_req());
+ auto r = op->get_req<MOSDRepOpReply>();
ceph_assert(r->get_header().type == MSG_OSD_REPOPREPLY);
op->mark_started();
auto iter = in_progress_ops.find(rep_tid);
if (iter != in_progress_ops.end()) {
InProgressOp &ip_op = *iter->second;
- const MOSDOp *m = NULL;
+ const MOSDOp *m = nullptr;
if (ip_op.op)
- m = static_cast<const MOSDOp *>(ip_op.op->get_req());
+ m = ip_op.op->get_req<MOSDOp>();
if (m)
dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
void ReplicatedBackend::_do_push(OpRequestRef op)
{
- const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
+ auto m = op->get_req<MOSDPGPush>();
ceph_assert(m->get_type() == MSG_OSD_PG_PUSH);
pg_shard_t from = m->from;
int started = bc->start_pushes(i.hoid, obc, h);
if (started < 0) {
bc->pushing[i.hoid].clear();
- bc->get_parent()->primary_failed(i.hoid);
- bc->get_parent()->primary_error(i.hoid, obc->obs.oi.version);
+ bc->get_parent()->on_failed_pull(
+ { bc->get_parent()->whoami_shard() },
+ i.hoid, obc->obs.oi.version);
} else if (!started) {
bc->get_parent()->on_global_recover(
i.hoid, i.stat, false);
void ReplicatedBackend::_do_pull_response(OpRequestRef op)
{
- const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
+ auto m = op->get_req<MOSDPGPush>();
ceph_assert(m->get_type() == MSG_OSD_PG_PUSH);
pg_shard_t from = m->from;
void ReplicatedBackend::do_push_reply(OpRequestRef op)
{
- const MOSDPGPushReply *m = static_cast<const MOSDPGPushReply *>(op->get_req());
+ auto m = op->get_req<MOSDPGPushReply>();
ceph_assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
pg_shard_t from = m->from;
ceph_tid_t tid,
osd_reqid_t reqid,
eversion_t pg_trim_to,
- eversion_t pg_roll_forward_to,
+ eversion_t min_last_complete_ondisk,
hobject_t new_temp_oid,
hobject_t discard_temp_oid,
const bufferlist &log_entries,
- boost::optional<pg_hit_set_history_t> &hset_hist,
+ std::optional<pg_hit_set_history_t> &hset_hist,
ObjectStore::Transaction &op_t,
pg_shard_t peer,
const pg_info_t &pinfo)
wr->pg_stats = get_info().stats;
wr->pg_trim_to = pg_trim_to;
- wr->pg_roll_forward_to = pg_roll_forward_to;
+
+ if (HAVE_FEATURE(parent->min_peer_features(), OSD_REPOP_MLCOD)) {
+ wr->min_last_complete_ondisk = min_last_complete_ondisk;
+ } else {
+ /* Some replicas need this field to be at_version. New replicas
+ * will ignore it */
+ wr->set_rollback_to(at_version);
+ }
wr->new_temp_oid = new_temp_oid;
wr->discard_temp_oid = discard_temp_oid;
ceph_tid_t tid,
osd_reqid_t reqid,
eversion_t pg_trim_to,
- eversion_t pg_roll_forward_to,
+ eversion_t min_last_complete_ondisk,
hobject_t new_temp_oid,
hobject_t discard_temp_oid,
const vector<pg_log_entry_t> &log_entries,
- boost::optional<pg_hit_set_history_t> &hset_hist,
+ std::optional<pg_hit_set_history_t> &hset_hist,
InProgressOp *op,
ObjectStore::Transaction &op_t)
{
tid,
reqid,
pg_trim_to,
- pg_roll_forward_to,
+ min_last_complete_ondisk,
new_temp_oid,
discard_temp_oid,
logs,
void ReplicatedBackend::do_repop(OpRequestRef op)
{
static_cast<MOSDRepOp*>(op->get_nonconst_req())->finish_decode();
- const MOSDRepOp *m = static_cast<const MOSDRepOp *>(op->get_req());
+ auto m = op->get_req<MOSDRepOp>();
int msg_type = m->get_type();
ceph_assert(MSG_OSD_REPOP == msg_type);
log,
m->updated_hit_set_history,
m->pg_trim_to,
- m->pg_roll_forward_to,
+ m->version, /* Replicated PGs don't have rollback info */
+ m->min_last_complete_ondisk,
update_snaps,
rm->localt,
async);
rm->committed = true;
// send commit.
- const MOSDRepOp *m = static_cast<const MOSDRepOp*>(rm->op->get_req());
+ auto m = rm->op->get_req<MOSDRepOp>();
ceph_assert(m->get_type() == MSG_OSD_REPOP);
dout(10) << __func__ << " on op " << *m
<< ", sending commit to osd." << rm->ackerosd
if (size)
data_subset.insert(0, size);
+ if (HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS)) {
+ const auto it = missing.get_items().find(head);
+ assert(it != missing.get_items().end());
+ data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
+ dout(10) << "calc_head_subsets " << head
+ << " data_subset " << data_subset << dendl;
+ }
+
if (get_parent()->get_pool().allow_incomplete_clones()) {
dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
return;
interval_set<uint64_t> cloning;
interval_set<uint64_t> prev;
+ hobject_t c = head;
if (size)
prev.insert(0, size);
for (int j=snapset.clones.size()-1; j>=0; j--) {
- hobject_t c = head;
c.snap = snapset.clones[j];
prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
if (!missing.is_missing(c) &&
get_parent()->try_lock_for_read(c, manager)) {
dout(10) << "calc_head_subsets " << head << " has prev " << c
<< " overlap " << prev << dendl;
- clone_subsets[c] = prev;
- cloning.union_of(prev);
+ cloning = prev;
break;
}
dout(10) << "calc_head_subsets " << head << " does not have prev " << c
<< " overlap " << prev << dendl;
}
+ cloning.intersection_of(data_subset);
+ if (cloning.empty()) {
+ dout(10) << "skipping clone, nothing needs to clone" << dendl;
+ return;
+ }
- if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
+ if (cloning.num_intervals() > g_conf().get_val<uint64_t>("osd_recover_clone_overlap_limit")) {
dout(10) << "skipping clone, too many holes" << dendl;
get_parent()->release_locks(manager);
clone_subsets.clear();
cloning.clear();
+ return;
}
// what's left for us to push?
+ clone_subsets[c] = cloning;
data_subset.subtract(cloning);
dout(10) << "calc_head_subsets " << head
<< " overlap " << next << dendl;
}
- if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
+ if (cloning.num_intervals() > g_conf().get_val<uint64_t>("osd_recover_clone_overlap_limit")) {
dout(10) << "skipping clone, too many holes" << dendl;
get_parent()->release_locks(manager);
clone_subsets.clear();
ObjectContextRef headctx,
RPGHandle *h)
{
- ceph_assert(get_parent()->get_local_missing().get_items().count(soid));
- eversion_t _v = get_parent()->get_local_missing().get_items().find(
- soid)->second.need;
+ const auto missing_iter = get_parent()->get_local_missing().get_items().find(soid);
+ ceph_assert(missing_iter != get_parent()->get_local_missing().get_items().end());
+ eversion_t _v = missing_iter->second.need;
ceph_assert(_v == v);
const map<hobject_t, set<pg_shard_t>> &missing_loc(
get_parent()->get_missing_loc_shards());
ceph_assert(ssc->snapset.clone_size.count(soid.snap));
recovery_info.size = ssc->snapset.clone_size[soid.snap];
+ recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
} else {
// pulling head or unversioned object.
// always pull the whole thing.
recovery_info.copy_subset.insert(0, (uint64_t)-1);
+ if (HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS))
+ recovery_info.copy_subset.intersection_of(missing_iter->second.clean_regions.get_dirty_regions());
recovery_info.size = ((uint64_t)-1);
+ recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
}
h->pulls[fromshard].push_back(PullOp());
op.recovery_info.soid = soid;
op.recovery_info.version = v;
op.recovery_progress.data_complete = false;
- op.recovery_progress.omap_complete = false;
+ op.recovery_progress.omap_complete = !missing_iter->second.clean_regions.omap_is_dirty()
+ && HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS);
op.recovery_progress.data_recovered_to = 0;
op.recovery_progress.first = true;
ObcLockManager &&lock_manager)
{
get_parent()->begin_peer_recover(peer, soid);
+ const auto pmissing_iter = get_parent()->get_shard_missing().find(peer);
+ const auto missing_iter = pmissing_iter->second.get_items().find(soid);
+ assert(missing_iter != pmissing_iter->second.get_items().end());
// take note.
PushInfo &pi = pushing[soid][peer];
pi.obc = obc;
pi.recovery_info.oi = obc->obs.oi;
pi.recovery_info.ss = pop->recovery_info.ss;
pi.recovery_info.version = version;
+ pi.recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
+ pi.recovery_progress.omap_complete = !missing_iter->second.clean_regions.omap_is_dirty() &&
+ HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS);
pi.lock_manager = std::move(lock_manager);
ObjectRecoveryProgress new_progress;
const ObjectRecoveryInfo &recovery_info,
bool first,
bool complete,
+ bool clear_omap,
bool cache_dont_need,
+ interval_set<uint64_t> &data_zeros,
const interval_set<uint64_t> &intervals_included,
bufferlist data_included,
bufferlist omap_header,
}
if (first) {
- t->remove(coll, ghobject_t(target_oid));
- t->touch(coll, ghobject_t(target_oid));
+ if (!complete) {
+ t->remove(coll, ghobject_t(target_oid));
+ t->touch(coll, ghobject_t(target_oid));
+ bufferlist bv = attrs.at(OI_ATTR);
+ object_info_t oi(bv);
+ t->set_alloc_hint(coll, ghobject_t(target_oid),
+ oi.expected_object_size,
+ oi.expected_write_size,
+ oi.alloc_hint_flags);
+ } else {
+ if (!recovery_info.object_exist) {
+ t->remove(coll, ghobject_t(target_oid));
+ t->touch(coll, ghobject_t(target_oid));
+ bufferlist bv = attrs.at(OI_ATTR);
+ object_info_t oi(bv);
+ t->set_alloc_hint(coll, ghobject_t(target_oid),
+ oi.expected_object_size,
+ oi.expected_write_size,
+ oi.alloc_hint_flags);
+ }
+ //remove xattr and update later if overwrite on original object
+ t->rmattrs(coll, ghobject_t(target_oid));
+ //if need update omap, clear the previous content first
+ if (clear_omap)
+ t->omap_clear(coll, ghobject_t(target_oid));
+ }
+
t->truncate(coll, ghobject_t(target_oid), recovery_info.size);
- if (omap_header.length())
+ if (omap_header.length())
t->omap_setheader(coll, ghobject_t(target_oid), omap_header);
- bufferlist bv = attrs.at(OI_ATTR);
- object_info_t oi(bv);
- t->set_alloc_hint(coll, ghobject_t(target_oid),
- oi.expected_object_size,
- oi.expected_write_size,
- oi.alloc_hint_flags);
+ struct stat st;
+ int r = store->stat(ch, ghobject_t(recovery_info.soid), &st);
if (get_parent()->pg_is_remote_backfilling()) {
- struct stat st;
uint64_t size = 0;
- int r = store->stat(ch, ghobject_t(recovery_info.soid), &st);
- if (r == 0) {
+ if (r == 0)
size = st.st_size;
- }
// Don't need to do anything if object is still the same size
if (size != recovery_info.oi.size) {
get_parent()->pg_add_local_num_bytes((int64_t)recovery_info.oi.size - (int64_t)size);
<< dendl;
}
}
+ if (!complete) {
+ //clone overlap content in local object
+ if (recovery_info.object_exist) {
+ assert(r == 0);
+ uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size);
+ interval_set<uint64_t> local_intervals_included, local_intervals_excluded;
+ if (local_size) {
+ local_intervals_included.insert(0, local_size);
+ local_intervals_excluded.intersection_of(local_intervals_included, recovery_info.copy_subset);
+ local_intervals_included.subtract(local_intervals_excluded);
+ }
+ for (interval_set<uint64_t>::const_iterator q = local_intervals_included.begin();
+ q != local_intervals_included.end();
+ ++q) {
+ dout(15) << " clone_range " << recovery_info.soid << " "
+ << q.get_start() << "~" << q.get_len() << dendl;
+ t->clone_range(coll, ghobject_t(recovery_info.soid), ghobject_t(target_oid),
+ q.get_start(), q.get_len(), q.get_start());
+ }
+ }
+ }
}
uint64_t off = 0;
uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
if (cache_dont_need)
fadvise_flags |= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
+ // Punch zeros for data, if fiemap indicates nothing but it is marked dirty
+ if (data_zeros.size() > 0) {
+ data_zeros.intersection_of(recovery_info.copy_subset);
+ assert(intervals_included.subset_of(data_zeros));
+ data_zeros.subtract(intervals_included);
+
+ dout(20) << __func__ <<" recovering object " << recovery_info.soid
+ << " copy_subset: " << recovery_info.copy_subset
+ << " intervals_included: " << intervals_included
+ << " data_zeros: " << data_zeros << dendl;
+
+ for (auto p = data_zeros.begin(); p != data_zeros.end(); ++p)
+ t->zero(coll, ghobject_t(target_oid), p.get_start(), p.get_len());
+ }
for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
p != intervals_included.end();
++p) {
if (complete) {
if (!first) {
dout(10) << __func__ << ": Removing oid "
- << target_oid << " from the temp collection" << dendl;
+ << target_oid << " from the temp collection" << dendl;
clear_temp_obj(target_oid);
t->remove(coll, ghobject_t(recovery_info.soid));
t->collection_move_rename(coll, ghobject_t(target_oid),
- coll, ghobject_t(recovery_info.soid));
+ coll, ghobject_t(recovery_info.soid));
}
submit_push_complete(recovery_info, t);
+
}
}
const hobject_t &hoid = pop.soid;
ceph_assert((data_included.empty() && data.length() == 0) ||
- (!data_included.empty() && data.length() > 0));
+ (!data_included.empty() && data.length() > 0));
auto piter = pulling.find(hoid);
if (piter == pulling.end()) {
a.second.rebuild();
}
pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset);
+ if (attrset.find(SS_ATTR) != attrset.end()) {
+ bufferlist ssbv = attrset.at(SS_ATTR);
+ SnapSet ss(ssbv);
+ assert(!pi.obc->ssc->exists || ss.seq == pi.obc->ssc->snapset.seq);
+ }
pi.recovery_info.oi = pi.obc->obs.oi;
pi.recovery_info = recalc_subsets(
pi.recovery_info,
pi.recovery_progress = pop.after_progress;
dout(10) << "new recovery_info " << pi.recovery_info
- << ", new progress " << pi.recovery_progress
- << dendl;
-
+ << ", new progress " << pi.recovery_progress
+ << dendl;
+ interval_set<uint64_t> data_zeros;
+ uint64_t z_offset = pop.before_progress.data_recovered_to;
+ uint64_t z_length = pop.after_progress.data_recovered_to - pop.before_progress.data_recovered_to;
+ if(z_length)
+ data_zeros.insert(z_offset, z_length);
bool complete = pi.is_complete();
-
- submit_push_data(pi.recovery_info, first,
- complete, pi.cache_dont_need,
- data_included, data,
- pop.omap_header,
- pop.attrset,
- pop.omap_entries,
- t);
+ bool clear_omap = !pop.before_progress.omap_complete;
+
+ submit_push_data(pi.recovery_info,
+ first,
+ complete,
+ clear_omap,
+ pi.cache_dont_need,
+ data_zeros,
+ data_included,
+ data,
+ pop.omap_header,
+ pop.attrset,
+ pop.omap_entries,
+ t);
pi.stat.num_keys_recovered += pop.omap_entries.size();
pi.stat.num_bytes_recovered += data.length();
bool first = pop.before_progress.first;
bool complete = pop.after_progress.data_complete &&
pop.after_progress.omap_complete;
-
+ bool clear_omap = !pop.before_progress.omap_complete;
+ interval_set<uint64_t> data_zeros;
+ uint64_t z_offset = pop.before_progress.data_recovered_to;
+ uint64_t z_length = pop.after_progress.data_recovered_to - pop.before_progress.data_recovered_to;
+ if(z_length)
+ data_zeros.insert(z_offset, z_length);
response->soid = pop.recovery_info.soid;
+
submit_push_data(pop.recovery_info,
first,
complete,
+ clear_omap,
true, // must be replicate
+ data_zeros,
pop.data_included,
data,
pop.omap_header,
int r = store->fiemap(ch, ghobject_t(recovery_info.soid), 0,
copy_subset.range_end(), m);
if (r >= 0) {
- interval_set<uint64_t> fiemap_included(m);
+ interval_set<uint64_t> fiemap_included(std::move(m));
copy_subset.intersection_of(fiemap_included);
} else {
// intersection of copy_subset and empty interval_set would be empty anyway
out_op->data_included.clear();
}
- for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
- p != out_op->data_included.end();
- ++p) {
- bufferlist bit;
- int r = store->read(ch, ghobject_t(recovery_info.soid),
- p.get_start(), p.get_len(), bit,
- cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
- if (cct->_conf->osd_debug_random_push_read_error &&
+ auto origin_size = out_op->data_included.size();
+ bufferlist bit;
+ int r = store->readv(ch, ghobject_t(recovery_info.soid),
+ out_op->data_included, bit,
+ cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
+ if (cct->_conf->osd_debug_random_push_read_error &&
(rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) {
- dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl;
- r = -EIO;
- }
- if (r < 0) {
- return r;
- }
- if (p.get_len() != bit.length()) {
- dout(10) << " extent " << p.get_start() << "~" << p.get_len()
- << " is actually " << p.get_start() << "~" << bit.length()
- << dendl;
- interval_set<uint64_t>::iterator save = p++;
- if (bit.length() == 0)
- out_op->data_included.erase(save); //Remove this empty interval
- else
- save.set_len(bit.length());
- // Remove any other intervals present
- while (p != out_op->data_included.end()) {
- interval_set<uint64_t>::iterator save = p++;
- out_op->data_included.erase(save);
- }
- new_progress.data_complete = true;
- out_op->data.claim_append(bit);
- break;
- }
- out_op->data.claim_append(bit);
+ dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl;
+ r = -EIO;
}
+ if (r < 0) {
+ return r;
+ }
+ if (out_op->data_included.size() != origin_size) {
+ dout(10) << __func__ << " some extents get pruned "
+ << out_op->data_included.size() << "/" << origin_size
+ << dendl;
+ new_progress.data_complete = true;
+ }
+ out_op->data.claim_append(bit);
if (progress.first && !out_op->data_included.empty() &&
out_op->data_included.begin().get_start() == 0 &&
out_op->data.length() == oi.size && oi.is_data_digest()) {
if (get_parent()->pg_is_repair())
stat->num_objects_repaired++;
}
+ } else if (progress.first && progress.omap_complete) {
+ // If omap is not changed, we need recovery omap when recovery cannot be completed once
+ new_progress.omap_complete = false;
}
if (stat) {
if (!error)
get_parent()->on_global_recover(soid, stat, false);
else
- get_parent()->on_primary_error(soid, v);
+ get_parent()->on_failed_pull(
+ std::set<pg_shard_t>{ get_parent()->whoami_shard() },
+ soid,
+ v);
pushing.erase(soid);
} else {
// This looks weird, but we erased the current peer and need to remember
if (progress.first && recovery_info.size == ((uint64_t)-1)) {
// Adjust size and copy_subset
recovery_info.size = st.st_size;
- recovery_info.copy_subset.clear();
- if (st.st_size)
- recovery_info.copy_subset.insert(0, st.st_size);
- ceph_assert(recovery_info.clone_subset.empty());
+ if (st.st_size) {
+ interval_set<uint64_t> object_range;
+ object_range.insert(0, st.st_size);
+ recovery_info.copy_subset.intersection_of(object_range);
+ } else {
+ recovery_info.copy_subset.clear();
+ }
+ assert(recovery_info.clone_subset.empty());
}
r = build_push_op(recovery_info, progress, 0, reply);
void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid)
{
dout(20) << __func__ << ": " << soid << " from " << from << dendl;
- list<pg_shard_t> fl = { from };
auto it = pulling.find(soid);
assert(it != pulling.end());
- get_parent()->failed_push(fl, soid, it->second.recovery_info.version);
+ get_parent()->on_failed_pull(
+ { from },
+ soid,
+ it->second.recovery_info.version);
clear_pull(it);
}