struct RecoveryMessages {
map<hobject_t,
ECBackend::read_request_t> reads;
+ map<hobject_t, set<int>> want_to_read;
void read(
ECBackend *ec,
const hobject_t &hoid, uint64_t off, uint64_t len,
+ set<int> &&_want_to_read,
const set<pg_shard_t> &need,
bool attrs) {
list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
to_read.push_back(boost::make_tuple(off, len, 0));
assert(!reads.count(hoid));
+ want_to_read.insert(make_pair(hoid, std::move(_want_to_read)));
reads.insert(
make_pair(
hoid,
return;
start_read_op(
priority,
+ m.want_to_read,
m.reads,
OpRequestRef(),
false, true);
op.hoid,
op.recovery_progress.data_recovered_to,
amount,
+ std::move(want),
to_read,
op.recovery_progress.first && !op.obc);
op.extent_requested = make_pair(
// not conflict with ECSubWrite's operator<<.
MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
_op->get_nonconst_req());
+ parent->maybe_preempt_replica_scrub(op->op.soid);
handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
return true;
}
have.insert(j->first.shard);
dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
}
- set<int> want_to_read, dummy_minimum;
- get_want_to_read_shards(&want_to_read);
+ set<int> dummy_minimum;
int err;
- if ((err = ec_impl->minimum_to_decode(want_to_read, have, &dummy_minimum)) < 0) {
+ if ((err = ec_impl->minimum_to_decode(rop.want_to_read[iter->first], have, &dummy_minimum)) < 0) {
dout(20) << __func__ << " minimum_to_decode failed" << dendl;
if (rop.in_progress.empty()) {
// If we don't have enough copies and we haven't sent reads for all shards
void ECBackend::get_all_avail_shards(
const hobject_t &hoid,
+ const set<pg_shard_t> &error_shards,
set<int> &have,
map<shard_id_t, pg_shard_t> &shards,
bool for_recovery)
++i) {
dout(10) << __func__ << ": checking acting " << *i << dendl;
const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
+ if (error_shards.find(*i) != error_shards.end())
+ continue;
if (!missing.is_missing(hoid)) {
assert(!have.count(i->shard));
have.insert(i->shard);
get_parent()->get_backfill_shards().begin();
i != get_parent()->get_backfill_shards().end();
++i) {
+ if (error_shards.find(*i) != error_shards.end())
+ continue;
if (have.count(i->shard)) {
assert(shards.count(i->shard));
continue;
if (m) {
assert(!(*m).is_missing(hoid));
}
+ if (error_shards.find(*i) != error_shards.end())
+ continue;
have.insert(i->shard);
shards.insert(make_pair(i->shard, *i));
}
set<int> have;
map<shard_id_t, pg_shard_t> shards;
+ set<pg_shard_t> error_shards;
- get_all_avail_shards(hoid, have, shards, for_recovery);
+ get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
set<int> need;
int r = ec_impl->minimum_to_decode(want, have, &need);
int ECBackend::get_remaining_shards(
const hobject_t &hoid,
const set<int> &avail,
+ const set<int> &want,
+ const read_result_t &result,
set<pg_shard_t> *to_read,
bool for_recovery)
{
set<int> have;
map<shard_id_t, pg_shard_t> shards;
+ set<pg_shard_t> error_shards;
+ for (auto &p : result.errors) {
+ error_shards.insert(p.first);
+ }
+
+ get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
+
+ set<int> need;
+ int r = ec_impl->minimum_to_decode(want, have, &need);
+ if (r < 0) {
+ dout(0) << __func__ << " not enough shards left to try for " << hoid
+ << " read result was " << result << dendl;
+ return -EIO;
+ }
- get_all_avail_shards(hoid, have, shards, for_recovery);
+ set<int> shards_left;
+ for (auto p : need) {
+ if (avail.find(p) == avail.end()) {
+ shards_left.insert(p);
+ }
+ }
- for (set<int>::iterator i = have.begin();
- i != have.end();
+ for (set<int>::iterator i = shards_left.begin();
+ i != shards_left.end();
++i) {
assert(shards.count(shard_id_t(*i)));
- if (avail.find(*i) == avail.end())
- to_read->insert(shards[shard_id_t(*i)]);
+ assert(avail.find(*i) == avail.end());
+ to_read->insert(shards[shard_id_t(*i)]);
}
return 0;
}
void ECBackend::start_read_op(
int priority,
+ map<hobject_t, set<int>> &want_to_read,
map<hobject_t, read_request_t> &to_read,
OpRequestRef _op,
bool do_redundant_reads,
do_redundant_reads,
for_recovery,
_op,
+ std::move(want_to_read),
std::move(to_read))).first->second;
dout(10) << __func__ << ": starting " << op << dendl;
if (_op) {
return;
}
+ map<hobject_t, set<int>> obj_want_to_read;
set<int> want_to_read;
get_want_to_read_shards(&want_to_read);
shards,
false,
c)));
+ obj_want_to_read.insert(make_pair(to_read.first, want_to_read));
}
start_read_op(
CEPH_MSG_PRIO_DEFAULT,
+ obj_want_to_read,
for_read_op,
OpRequestRef(),
fast_read, false);
already_read.insert(i->shard);
dout(10) << __func__ << " have/error shards=" << already_read << dendl;
set<pg_shard_t> shards;
- int r = get_remaining_shards(hoid, already_read, &shards, rop.for_recovery);
+ int r = get_remaining_shards(hoid, already_read, rop.want_to_read[hoid],
+ rop.complete[hoid], &shards, rop.for_recovery);
if (r)
return r;
- if (shards.empty())
- return -EIO;
-
- dout(10) << __func__ << " Read remaining shards " << shards << dendl;
- // TODOSAM: this doesn't seem right
list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
rop.to_read.find(hoid)->second.to_read;
GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
rop.to_read.find(hoid)->second.cb;
- map<hobject_t, read_request_t> for_read_op;
- for_read_op.insert(
- make_pair(
+ rop.to_read.erase(hoid);
+ rop.to_read.insert(make_pair(
hoid,
read_request_t(
offsets,
shards,
false,
c)));
-
- rop.to_read.swap(for_read_op);
do_read_op(rop);
return 0;
}
old_size));
}
-void ECBackend::be_deep_scrub(
+int ECBackend::be_deep_scrub(
const hobject_t &poid,
- uint32_t seed,
- ScrubMap::object &o,
- ThreadPool::TPHandle &handle) {
- bufferhash h(-1); // we always used -1
+ ScrubMap &map,
+ ScrubMapBuilder &pos,
+ ScrubMap::object &o)
+{
+ dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
int r;
+
+ uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+ CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
+
+ utime_t sleeptime;
+ sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
+ if (sleeptime != utime_t()) {
+ lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
+ sleeptime.sleep();
+ }
+
+ if (pos.data_pos == 0) {
+ pos.data_hash = bufferhash(-1);
+ }
+
uint64_t stride = cct->_conf->osd_deep_scrub_stride;
if (stride % sinfo.get_chunk_size())
stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
- uint64_t pos = 0;
- uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
-
- while (true) {
- bufferlist bl;
- handle.reset_tp_timeout();
- r = store->read(
- ch,
- ghobject_t(
- poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
- pos,
- stride, bl,
- fadvise_flags);
- if (r < 0)
- break;
- if (bl.length() % sinfo.get_chunk_size()) {
- r = -EIO;
- break;
- }
- pos += r;
- h << bl;
- if ((unsigned)r < stride)
- break;
+ bufferlist bl;
+ r = store->read(
+ ch,
+ ghobject_t(
+ poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ pos.data_pos,
+ stride, bl,
+ fadvise_flags);
+ if (r < 0) {
+ dout(20) << __func__ << " " << poid << " got "
+ << r << " on read, read_error" << dendl;
+ o.read_error = true;
+ return 0;
}
-
- if (r == -EIO) {
- dout(0) << "_scan_list " << poid << " got "
- << r << " on read, read_error" << dendl;
+ if (bl.length() % sinfo.get_chunk_size()) {
+ dout(20) << __func__ << " " << poid << " got "
+ << r << " on read, not chunk size " << sinfo.get_chunk_size() << " aligned"
+ << dendl;
o.read_error = true;
- return;
+ return 0;
+ }
+ if (r > 0) {
+ pos.data_hash << bl;
+ }
+ pos.data_pos += r;
+ if (r == (int)stride) {
+ return -EINPROGRESS;
}
ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
dout(0) << "_scan_list " << poid << " could not retrieve hash info" << dendl;
o.read_error = true;
o.digest_present = false;
- return;
+ return 0;
} else {
if (!get_parent()->get_pool().allows_ecoverwrites()) {
assert(hinfo->has_chunk_hash());
- if (hinfo->get_total_chunk_size() != pos) {
- dout(0) << "_scan_list " << poid << " got incorrect size on read" << dendl;
+ if (hinfo->get_total_chunk_size() != (unsigned)pos.data_pos) {
+ dout(0) << "_scan_list " << poid << " got incorrect size on read 0x"
+ << std::hex << pos
+ << " expected 0x" << hinfo->get_total_chunk_size() << std::dec
+ << dendl;
o.ec_size_mismatch = true;
- return;
+ return 0;
}
- if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) {
- dout(0) << "_scan_list " << poid << " got incorrect hash on read" << dendl;
+ if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) !=
+ pos.data_hash.digest()) {
+ dout(0) << "_scan_list " << poid << " got incorrect hash on read 0x"
+ << std::hex << pos.data_hash.digest() << " != expected 0x"
+ << hinfo->get_chunk_hash(get_parent()->whoami_shard().shard)
+ << std::dec << dendl;
o.ec_hash_mismatch = true;
- return;
+ return 0;
}
/* We checked above that we match our own stored hash. We cannot
}
}
- o.omap_digest = seed;
+ o.omap_digest = -1;
o.omap_digest_present = true;
+ return 0;
}