: reserved(false), reserve_failed(false),
epoch_start(0),
active(false),
- waiting_on(0), shallow_errors(0), deep_errors(0), fixed(0),
+ shallow_errors(0), deep_errors(0), fixed(0),
must_scrub(false), must_deep_scrub(false), must_repair(false),
auto_repair(false),
num_digest_updates_pending(0),
state(INACTIVE),
- deep(false),
- seed(0)
+ deep(false)
{}
PG::Scrubber::~Scrubber() {}
<< scrubber.received_maps[m->from].valid_through
<< dendl;
- --scrubber.waiting_on;
+ dout(10) << __func__ << " waiting_on_whom was " << scrubber.waiting_on_whom
+ << dendl;
+ assert(scrubber.waiting_on_whom.count(m->from));
scrubber.waiting_on_whom.erase(m->from);
- if (scrubber.waiting_on == 0) {
+ if (m->preempted) {
+ dout(10) << __func__ << " replica was preempted, setting flag" << dendl;
+ scrub_preempted = true;
+ }
+ if (scrubber.waiting_on_whom.empty()) {
if (ops_blocked_by_scrub()) {
requeue_scrub(true);
} else {
<< scrubber.received_maps[m->from].valid_through
<< dendl;
- --scrubber.waiting_on;
scrubber.waiting_on_whom.erase(m->from);
- if (scrubber.waiting_on == 0) {
+ if (scrubber.waiting_on_whom.empty()) {
if (ops_blocked_by_scrub()) {
requeue_scrub(true);
} else {
void PG::_request_scrub_map(
pg_shard_t replica, eversion_t version,
hobject_t start, hobject_t end,
- bool deep, uint32_t seed)
+ bool deep,
+ bool allow_preemption)
{
assert(replica != pg_whoami);
dout(10) << "scrub requesting scrubmap from osd." << replica
- << " deep " << (int)deep << " seed " << seed << dendl;
+ << " deep " << (int)deep << dendl;
MOSDRepScrub *repscrubop = new MOSDRepScrub(
spg_t(info.pgid.pgid, replica.shard), version,
get_osdmap()->get_epoch(),
get_last_peering_reset(),
- start, end, deep, seed);
+ start, end, deep,
+ allow_preemption,
+ scrubber.priority,
+ ops_blocked_by_scrub());
// default priority, we want the rep scrub processed prior to any recovery
// or client io messages (we are holding a lock!)
osd->send_message_osd_cluster(
{
hobject_t head;
SnapSet snapset;
+
+ // Test qa/standalone/scrub/osd-scrub-snaps.sh uses this message to verify
+ // caller using clean_meta_map(), and it works properly.
+ dout(20) << __func__ << " start" << dendl;
+
for (map<hobject_t, ScrubMap::object>::reverse_iterator i = smap.objects.rbegin();
i != smap.objects.rend();
++i) {
const hobject_t &hoid = i->first;
ScrubMap::object &o = i->second;
+ dout(20) << __func__ << " " << hoid << dendl;
+
if (hoid.is_head() || hoid.is_snapdir()) {
// parse the SnapSet
bufferlist bl;
}
}
}
-
-/*
- * build a scrub map over a chunk without releasing the lock
- * only used by chunky scrub
- */
int PG::build_scrub_map_chunk(
ScrubMap &map,
- hobject_t start, hobject_t end, bool deep, uint32_t seed,
+ ScrubMapBuilder &pos,
+ hobject_t start,
+ hobject_t end,
+ bool deep,
ThreadPool::TPHandle &handle)
{
dout(10) << __func__ << " [" << start << "," << end << ") "
- << " seed " << seed << dendl;
-
- map.valid_through = info.last_update;
+ << " pos " << pos
+ << dendl;
- // objects
- vector<hobject_t> ls;
- vector<ghobject_t> rollback_obs;
- int ret = get_pgbackend()->objects_list_range(
- start,
- end,
- 0,
- &ls,
- &rollback_obs);
- if (ret < 0) {
- dout(5) << "objects_list_range error: " << ret << dendl;
- return ret;
+ // start
+ while (pos.empty()) {
+ pos.deep = deep;
+ map.valid_through = info.last_update;
+ osr->flush();
+
+ // objects
+ vector<ghobject_t> rollback_obs;
+ pos.ret = get_pgbackend()->objects_list_range(
+ start,
+ end,
+ 0,
+ &pos.ls,
+ &rollback_obs);
+ if (pos.ret < 0) {
+ dout(5) << "objects_list_range error: " << pos.ret << dendl;
+ return pos.ret;
+ }
+ if (pos.ls.empty()) {
+ break;
+ }
+ _scan_rollback_obs(rollback_obs, handle);
+ pos.pos = 0;
+ return -EINPROGRESS;
}
+ // scan objects
+ while (!pos.done()) {
+ int r = get_pgbackend()->be_scan_list(map, pos);
+ if (r == -EINPROGRESS) {
+ return r;
+ }
+ }
- get_pgbackend()->be_scan_list(map, ls, deep, seed, handle);
- _scan_rollback_obs(rollback_obs, handle);
- _scan_snaps(map);
+ // finish
+ dout(20) << __func__ << " finishing" << dendl;
+ assert(pos.done());
_repair_oinfo_oid(map);
+ if (!is_primary()) {
+ ScrubMap for_meta_scrub;
+ // In case we restarted smaller chunk, clear old data
+ scrubber.cleaned_meta_map.clear_from(scrubber.start);
+ scrubber.cleaned_meta_map.insert(map);
+ scrubber.clean_meta_map(for_meta_scrub);
+ _scan_snaps(for_meta_scrub);
+ }
- dout(20) << __func__ << " done" << dendl;
+ dout(20) << __func__ << " done, got " << map.objects.size() << " items"
+ << dendl;
return 0;
}
return;
}
- ScrubMap map;
-
assert(msg->chunky);
if (last_update_applied < msg->scrub_to) {
dout(10) << "waiting for last_update_applied to catch up" << dendl;
return;
}
- // compensate for hobject_t's with wrong pool from sloppy hammer OSDs
- hobject_t start = msg->start;
- hobject_t end = msg->end;
- if (!start.is_max())
- start.pool = info.pgid.pool();
- if (!end.is_max())
- end.pool = info.pgid.pool();
-
- build_scrub_map_chunk(
- map, start, end, msg->deep, msg->seed,
- handle);
-
- if (HAVE_FEATURE(acting_features, SERVER_LUMINOUS)) {
- MOSDRepScrubMap *reply = new MOSDRepScrubMap(
- spg_t(info.pgid.pgid, get_primary().shard),
- msg->map_epoch,
- pg_whoami);
- ::encode(map, reply->get_data());
- osd->send_message_osd_cluster(reply, msg->get_connection());
+ scrubber.state = Scrubber::BUILD_MAP_REPLICA;
+ scrubber.replica_scrub_start = msg->min_epoch;
+ scrubber.start = msg->start;
+ scrubber.end = msg->end;
+ scrubber.max_end = msg->end;
+ scrubber.deep = msg->deep;
+ scrubber.epoch_start = info.history.same_interval_since;
+ if (msg->priority) {
+ scrubber.priority = msg->priority;
} else {
- // for jewel compatibility
- vector<OSDOp> scrub(1);
- scrub[0].op.op = CEPH_OSD_OP_SCRUB_MAP;
- hobject_t poid;
- eversion_t v;
- osd_reqid_t reqid;
- MOSDSubOp *subop = new MOSDSubOp(
- reqid,
- pg_whoami,
- spg_t(info.pgid.pgid, get_primary().shard),
- poid,
- 0,
- msg->map_epoch,
- osd->get_tid(),
- v);
- ::encode(map, subop->get_data());
- subop->ops = scrub;
- osd->send_message_osd_cluster(subop, msg->get_connection());
+ scrubber.priority = get_scrub_priority();
}
+
+ scrub_can_preempt = msg->allow_preemption;
+ scrub_preempted = false;
+ scrubber.replica_scrubmap_pos.reset();
+
+ requeue_scrub(msg->high_priority);
}
/* Scrub:
scrub_queued = false;
scrubber.needs_sleep = true;
+ // for the replica
+ if (!is_primary() &&
+ scrubber.state == PG::Scrubber::BUILD_MAP_REPLICA) {
+ chunky_scrub(handle);
+ return;
+ }
+
if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) {
dout(10) << "scrub -- not primary or active or not clean" << dendl;
state_clear(PG_STATE_SCRUBBING);
while (!done) {
dout(20) << "scrub state " << Scrubber::state_string(scrubber.state)
- << " [" << scrubber.start << "," << scrubber.end << ")" << dendl;
+ << " [" << scrubber.start << "," << scrubber.end << ")"
+ << " max_end " << scrubber.max_end << dendl;
switch (scrubber.state) {
case PG::Scrubber::INACTIVE:
dout(10) << "scrub start" << dendl;
+ assert(is_primary());
publish_stats_to_osd();
scrubber.epoch_start = info.history.same_interval_since;
osd->clog->debug(oss);
}
- scrubber.seed = -1;
-
+ scrubber.preempt_left = cct->_conf->get_val<uint64_t>(
+ "osd_scrub_max_preemptions");
+ scrubber.preempt_divisor = 1;
break;
case PG::Scrubber::NEW_CHUNK:
scrubber.primary_scrubmap = ScrubMap();
scrubber.received_maps.clear();
+ // begin (possible) preemption window
+ if (scrub_preempted) {
+ scrubber.preempt_left--;
+ scrubber.preempt_divisor *= 2;
+ dout(10) << __func__ << " preempted, " << scrubber.preempt_left
+ << " left" << dendl;
+ scrub_preempted = false;
+ }
+ scrub_can_preempt = scrubber.preempt_left > 0;
+
{
/* get the start and end of our scrub chunk
*
* left end of the range if we are a tier because they may legitimately
* not exist (see _scrub).
*/
- int min = MAX(3, cct->_conf->osd_scrub_chunk_min);
+ int min = std::max<int64_t>(3, cct->_conf->osd_scrub_chunk_min /
+ scrubber.preempt_divisor);
+ int max = std::max<int64_t>(min, cct->_conf->osd_scrub_chunk_max /
+ scrubber.preempt_divisor);
hobject_t start = scrubber.start;
hobject_t candidate_end;
vector<hobject_t> objects;
+ osr->flush();
ret = get_pgbackend()->objects_list_partial(
start,
min,
- MAX(min, cct->_conf->osd_scrub_chunk_max),
+ max,
&objects,
&candidate_end);
assert(ret >= 0);
break;
}
scrubber.end = candidate_end;
+ if (scrubber.end > scrubber.max_end)
+ scrubber.max_end = scrubber.end;
}
// walk the log to find the latest update that affects our chunk
// ask replicas to wait until
// last_update_applied >= scrubber.subset_last_update and then scan
scrubber.waiting_on_whom.insert(pg_whoami);
- ++scrubber.waiting_on;
// request maps from replicas
for (set<pg_shard_t>::iterator i = actingbackfill.begin();
if (*i == pg_whoami) continue;
_request_scrub_map(*i, scrubber.subset_last_update,
scrubber.start, scrubber.end, scrubber.deep,
- scrubber.seed);
+ scrubber.preempt_left > 0);
scrubber.waiting_on_whom.insert(*i);
- ++scrubber.waiting_on;
}
+ dout(10) << __func__ << " waiting_on_whom " << scrubber.waiting_on_whom
+ << dendl;
scrubber.state = PG::Scrubber::WAIT_PUSHES;
-
break;
case PG::Scrubber::WAIT_PUSHES:
break;
case PG::Scrubber::WAIT_LAST_UPDATE:
- if (last_update_applied >= scrubber.subset_last_update) {
- scrubber.state = PG::Scrubber::BUILD_MAP;
- } else {
+ if (last_update_applied < scrubber.subset_last_update) {
// will be requeued by op_applied
dout(15) << "wait for writes to flush" << dendl;
done = true;
- }
+ break;
+ }
+
+ scrubber.state = PG::Scrubber::BUILD_MAP;
+ scrubber.primary_scrubmap_pos.reset();
break;
case PG::Scrubber::BUILD_MAP:
assert(last_update_applied >= scrubber.subset_last_update);
// build my own scrub map
- ret = build_scrub_map_chunk(scrubber.primary_scrubmap,
- scrubber.start, scrubber.end,
- scrubber.deep, scrubber.seed,
- handle);
- if (ret < 0) {
- dout(5) << "error building scrub map: " << ret << ", aborting" << dendl;
+ if (scrub_preempted) {
+ dout(10) << __func__ << " preempted" << dendl;
+ scrubber.state = PG::Scrubber::BUILD_MAP_DONE;
+ break;
+ }
+ ret = build_scrub_map_chunk(
+ scrubber.primary_scrubmap,
+ scrubber.primary_scrubmap_pos,
+ scrubber.start, scrubber.end,
+ scrubber.deep,
+ handle);
+ if (ret == -EINPROGRESS) {
+ requeue_scrub();
+ done = true;
+ break;
+ }
+ scrubber.state = PG::Scrubber::BUILD_MAP_DONE;
+ break;
+
+ case PG::Scrubber::BUILD_MAP_DONE:
+ if (scrubber.primary_scrubmap_pos.ret < 0) {
+ dout(5) << "error: " << scrubber.primary_scrubmap_pos.ret
+ << ", aborting" << dendl;
scrub_clear_state();
scrub_unreserve_replicas();
return;
}
-
- --scrubber.waiting_on;
+ dout(10) << __func__ << " waiting_on_whom was "
+ << scrubber.waiting_on_whom << dendl;
+ assert(scrubber.waiting_on_whom.count(pg_whoami));
scrubber.waiting_on_whom.erase(pg_whoami);
scrubber.state = PG::Scrubber::WAIT_REPLICAS;
break;
case PG::Scrubber::WAIT_REPLICAS:
- if (scrubber.waiting_on > 0) {
+ if (!scrubber.waiting_on_whom.empty()) {
// will be requeued by sub_op_scrub_map
dout(10) << "wait for replicas to build scrub map" << dendl;
done = true;
- } else {
+ break;
+ }
+ // end (possible) preemption window
+ scrub_can_preempt = false;
+ if (scrub_preempted) {
+ dout(10) << __func__ << " preempted, restarting chunk" << dendl;
+ scrubber.state = PG::Scrubber::NEW_CHUNK;
+ } else {
scrubber.state = PG::Scrubber::COMPARE_MAPS;
}
break;
case PG::Scrubber::COMPARE_MAPS:
assert(last_update_applied >= scrubber.subset_last_update);
- assert(scrubber.waiting_on == 0);
+ assert(scrubber.waiting_on_whom.empty());
scrub_compare_maps();
scrubber.start = scrubber.end;
break;
}
+ scrubber.preempt_left = cct->_conf->get_val<uint64_t>(
+ "osd_scrub_max_preemptions");
+ scrubber.preempt_divisor = 1;
+
if (!(scrubber.end.is_max())) {
- scrubber.state = PG::Scrubber::NEW_CHUNK;
+ scrubber.state = PG::Scrubber::NEW_CHUNK;
requeue_scrub();
done = true;
} else {
break;
+ case PG::Scrubber::BUILD_MAP_REPLICA:
+ // build my own scrub map
+ if (scrub_preempted) {
+ dout(10) << __func__ << " preempted" << dendl;
+ ret = 0;
+ } else {
+ ret = build_scrub_map_chunk(
+ scrubber.replica_scrubmap,
+ scrubber.replica_scrubmap_pos,
+ scrubber.start, scrubber.end,
+ scrubber.deep,
+ handle);
+ }
+ if (ret == -EINPROGRESS) {
+ requeue_scrub();
+ done = true;
+ break;
+ }
+ // reply
+ if (HAVE_FEATURE(acting_features, SERVER_LUMINOUS)) {
+ MOSDRepScrubMap *reply = new MOSDRepScrubMap(
+ spg_t(info.pgid.pgid, get_primary().shard),
+ scrubber.replica_scrub_start,
+ pg_whoami);
+ reply->preempted = scrub_preempted;
+ ::encode(scrubber.replica_scrubmap, reply->get_data());
+ osd->send_message_osd_cluster(
+ get_primary().osd, reply,
+ scrubber.replica_scrub_start);
+ } else {
+ // for jewel compatibility
+ vector<OSDOp> scrub(1);
+ scrub[0].op.op = CEPH_OSD_OP_SCRUB_MAP;
+ hobject_t poid;
+ eversion_t v;
+ osd_reqid_t reqid;
+ MOSDSubOp *subop = new MOSDSubOp(
+ reqid,
+ pg_whoami,
+ spg_t(info.pgid.pgid, get_primary().shard),
+ poid,
+ 0,
+ scrubber.replica_scrub_start,
+ osd->get_tid(),
+ v);
+ ::encode(scrubber.replica_scrubmap, subop->get_data());
+ subop->ops = scrub;
+ osd->send_message_osd_cluster(
+ get_primary().osd, subop,
+ scrubber.replica_scrub_start);
+ }
+ scrub_preempted = false;
+ scrub_can_preempt = false;
+ scrubber.state = PG::Scrubber::INACTIVE;
+ scrubber.replica_scrubmap = ScrubMap();
+ scrubber.replica_scrubmap_pos = ScrubMapBuilder();
+ scrubber.start = hobject_t();
+ scrubber.end = hobject_t();
+ scrubber.max_end = hobject_t();
+ done = true;
+ break;
+
default:
ceph_abort();
}
}
dout(20) << "scrub final state " << Scrubber::state_string(scrubber.state)
- << " [" << scrubber.start << "," << scrubber.end << ")" << dendl;
+ << " [" << scrubber.start << "," << scrubber.end << ")"
+ << " max_end " << scrubber.max_end << dendl;
+}
+
+bool PG::write_blocked_by_scrub(const hobject_t& soid)
+{
+ if (soid < scrubber.start || soid >= scrubber.end) {
+ return false;
+ }
+ if (scrub_can_preempt) {
+ if (!scrub_preempted) {
+ dout(10) << __func__ << " " << soid << " preempted" << dendl;
+ scrub_preempted = true;
+ } else {
+ dout(10) << __func__ << " " << soid << " already preempted" << dendl;
+ }
+ return false;
+ }
+ return true;
+}
+
+bool PG::range_intersects_scrub(const hobject_t &start, const hobject_t& end)
+{
+ // does [start, end] intersect [scrubber.start, scrubber.max_end)
+ return (start < scrubber.max_end &&
+ end >= scrubber.start);
}
void PG::scrub_clear_state()
// construct authoritative scrub map for type specific scrubbing
scrubber.cleaned_meta_map.insert(scrubber.primary_scrubmap);
- map<hobject_t, pair<uint32_t, uint32_t>> missing_digest;
+ map<hobject_t,
+ pair<boost::optional<uint32_t>,
+ boost::optional<uint32_t>>> missing_digest;
+
+ map<pg_shard_t, ScrubMap *> maps;
+ maps[pg_whoami] = &scrubber.primary_scrubmap;
+
+ for (const auto& i : actingbackfill) {
+ if (i == pg_whoami) continue;
+ dout(2) << __func__ << " replica " << i << " has "
+ << scrubber.received_maps[i].objects.size()
+ << " items" << dendl;
+ maps[i] = &scrubber.received_maps[i];
+ }
+
+ set<hobject_t> master_set;
+
+ // Construct master set
+ for (const auto map : maps) {
+ for (const auto i : map.second->objects) {
+ master_set.insert(i.first);
+ }
+ }
+
+ stringstream ss;
+ get_pgbackend()->be_large_omap_check(maps, master_set,
+ scrubber.large_omap_objects, ss);
+ if (!ss.str().empty()) {
+ osd->clog->warn(ss);
+ }
if (acting.size() > 1) {
dout(10) << __func__ << " comparing replica scrub maps" << dendl;
- stringstream ss;
-
// Map from object with errors to good peer
map<hobject_t, list<pg_shard_t>> authoritative;
- map<pg_shard_t, ScrubMap *> maps;
dout(2) << __func__ << " osd." << acting[0] << " has "
<< scrubber.primary_scrubmap.objects.size() << " items" << dendl;
- maps[pg_whoami] = &scrubber.primary_scrubmap;
- for (set<pg_shard_t>::iterator i = actingbackfill.begin();
- i != actingbackfill.end();
- ++i) {
- if (*i == pg_whoami) continue;
- dout(2) << __func__ << " replica " << *i << " has "
- << scrubber.received_maps[*i].objects.size()
- << " items" << dendl;
- maps[*i] = &scrubber.received_maps[*i];
- }
+ ss.str("");
+ ss.clear();
get_pgbackend()->be_compare_scrubmaps(
maps,
+ master_set,
state_test(PG_STATE_REPAIR),
scrubber.missing,
scrubber.inconsistent,
}
ScrubMap for_meta_scrub;
- if (scrubber.end.is_max() ||
- scrubber.cleaned_meta_map.objects.empty()) {
- scrubber.cleaned_meta_map.swap(for_meta_scrub);
- } else {
- auto iter = scrubber.cleaned_meta_map.objects.end();
- --iter; // not empty, see if clause
- auto begin = scrubber.cleaned_meta_map.objects.begin();
- while (iter != begin) {
- auto next = iter--;
- if (next->first.get_head() != iter->first.get_head()) {
- ++iter;
- break;
- }
- }
- for_meta_scrub.objects.insert(begin, iter);
- scrubber.cleaned_meta_map.objects.erase(begin, iter);
- }
+ scrubber.clean_meta_map(for_meta_scrub);
// ok, do the pg-type specific scrubbing
scrub_snapshot_metadata(for_meta_scrub, missing_digest);
+ // Called here on the primary can use an authoritative map if it isn't the primary
+ _scan_snaps(for_meta_scrub);
if (!scrubber.store->empty()) {
if (state_test(PG_STATE_REPAIR)) {
dout(10) << __func__ << ": discarding scrub results" << dendl;
info.history.last_clean_scrub_stamp = now;
info.stats.stats.sum.num_shallow_scrub_errors = scrubber.shallow_errors;
info.stats.stats.sum.num_deep_scrub_errors = scrubber.deep_errors;
+ info.stats.stats.sum.num_large_omap_objects = scrubber.large_omap_objects;
} else {
info.stats.stats.sum.num_shallow_scrub_errors = scrubber.shallow_errors;
// XXX: last_clean_scrub_stamp doesn't mean the pg is not inconsistent
assert(entries.begin()->version > info.last_update);
PGLogEntryHandler rollbacker{this, &t};
- if (roll_forward_to) {
- pg_log.roll_forward(&rollbacker);
- }
bool invalidate_stats =
pg_log.append_new_log_entries(info.last_backfill,
info.last_backfill_bitwise,
PG::RecoveryState::Recovering::react(const DeferRecovery &evt)
{
PG *pg = context< RecoveryMachine >().pg;
+ if (!pg->state_test(PG_STATE_RECOVERING)) {
+ // we may have finished recovery and have an AllReplicasRecovered
+ // event queued to move us to the next state.
+ ldout(pg->cct, 10) << "got defer recovery but not recovering" << dendl;
+ return discard_event();
+ }
ldout(pg->cct, 10) << "defer recovery, retry delay " << evt.delay << dendl;
pg->state_clear(PG_STATE_RECOVERING);
pg->state_set(PG_STATE_RECOVERY_WAIT);
q.f->dump_string("scrubber.state", Scrubber::state_string(pg->scrubber.state));
q.f->dump_stream("scrubber.start") << pg->scrubber.start;
q.f->dump_stream("scrubber.end") << pg->scrubber.end;
+ q.f->dump_stream("scrubber.max_end") << pg->scrubber.max_end;
q.f->dump_stream("scrubber.subset_last_update") << pg->scrubber.subset_last_update;
q.f->dump_bool("scrubber.deep", pg->scrubber.deep);
- q.f->dump_unsigned("scrubber.seed", pg->scrubber.seed);
- q.f->dump_int("scrubber.waiting_on", pg->scrubber.waiting_on);
{
q.f->open_array_section("scrubber.waiting_on_whom");
for (set<pg_shard_t>::iterator p = pg->scrubber.waiting_on_whom.begin();
int64_t poolnum = pg->info.pgid.pool();
// Reset if min_size turn smaller than previous value, pg might now be able to go active
- if (advmap.lastmap->get_pools().find(poolnum)->second.min_size >
+ if (!advmap.osdmap->have_pg_pool(poolnum) ||
+ advmap.lastmap->get_pools().find(poolnum)->second.min_size >
advmap.osdmap->get_pools().find(poolnum)->second.min_size) {
post_event(advmap);
return transit< Reset >();