- child->update_snap_mapper_bits(split_bits);
- child->update_osdmap_ref(get_osdmap());
-
- child->pool = pool;
-
- // Log
- pg_log.split_into(child_pgid, split_bits, &(child->pg_log));
- child->info.last_complete = info.last_complete;
-
- info.last_update = pg_log.get_head();
- child->info.last_update = child->pg_log.get_head();
-
- child->info.last_user_version = info.last_user_version;
-
- info.log_tail = pg_log.get_tail();
- child->info.log_tail = child->pg_log.get_tail();
-
- if (info.last_complete < pg_log.get_tail())
- info.last_complete = pg_log.get_tail();
- if (child->info.last_complete < child->pg_log.get_tail())
- child->info.last_complete = child->pg_log.get_tail();
-
- // Info
- child->info.history = info.history;
- child->info.history.epoch_created = get_osdmap_epoch();
- child->info.purged_snaps = info.purged_snaps;
-
- if (info.last_backfill.is_max()) {
- child->info.set_last_backfill(hobject_t::get_max());
- } else {
- // restart backfill on parent and child to be safe. we could
- // probably do better in the bitwise sort case, but it's more
- // fragile (there may be special work to do on backfill completion
- // in the future).
- info.set_last_backfill(hobject_t());
- child->info.set_last_backfill(hobject_t());
- // restarting backfill implies that the missing set is empty,
- // since it is only used for objects prior to last_backfill
- pg_log.reset_backfill();
- child->pg_log.reset_backfill();
- }
-
- child->info.stats = info.stats;
- child->info.stats.parent_split_bits = split_bits;
- info.stats.stats_invalid = true;
- child->info.stats.stats_invalid = true;
- child->info.last_epoch_started = info.last_epoch_started;
- child->info.last_interval_started = info.last_interval_started;
-
- child->snap_trimq = snap_trimq;
-
- // There can't be recovery/backfill going on now
- int primary, up_primary;
- vector<int> newup, newacting;
- get_osdmap()->pg_to_up_acting_osds(
- child->info.pgid.pgid, &newup, &up_primary, &newacting, &primary);
- child->init_primary_up_acting(
- newup,
- newacting,
- up_primary,
- primary);
- child->role = OSDMap::calc_pg_role(osd->whoami, child->acting);
-
- // this comparison includes primary rank via pg_shard_t
- if (get_primary() != child->get_primary())
- child->info.history.same_primary_since = get_osdmap_epoch();
-
- child->info.stats.up = up;
- child->info.stats.up_primary = up_primary;
- child->info.stats.acting = acting;
- child->info.stats.acting_primary = primary;
- child->info.stats.mapping_epoch = get_osdmap_epoch();
-
- // History
- child->past_intervals = past_intervals;
-
- _split_into(child_pgid, child, split_bits);
-
- // release all backoffs for simplicity
- release_backoffs(hobject_t(), hobject_t::get_max());
-
- child->on_new_interval();
-
- child->send_notify = !child->is_primary();
-
- child->dirty_info = true;
- child->dirty_big_info = true;
- dirty_info = true;
- dirty_big_info = true;
-}
-
-void PG::start_split_stats(const set<spg_t>& childpgs, vector<object_stat_sum_t> *out)
-{
- out->resize(childpgs.size() + 1);
- info.stats.stats.sum.split(*out);
-}
-
-void PG::finish_split_stats(const object_stat_sum_t& stats, ObjectStore::Transaction *t)
-{
- info.stats.stats.sum = stats;
- write_if_dirty(*t);
-}
-
-void PG::merge_from(map<spg_t,PGRef>& sources, RecoveryCtx *rctx,
- unsigned split_bits,
- const pg_merge_meta_t& last_pg_merge_meta)
-{
- dout(10) << __func__ << " from " << sources << " split_bits " << split_bits
- << dendl;
- bool incomplete = false;
- if (info.last_complete != info.last_update ||
- info.is_incomplete() ||
- info.dne()) {
- dout(10) << __func__ << " target incomplete" << dendl;
- incomplete = true;
- }
- if (last_pg_merge_meta.source_pgid != pg_t()) {
- if (info.pgid.pgid != last_pg_merge_meta.source_pgid.get_parent()) {
- dout(10) << __func__ << " target doesn't match expected parent "
- << last_pg_merge_meta.source_pgid.get_parent()
- << " of source_pgid " << last_pg_merge_meta.source_pgid
- << dendl;
- incomplete = true;
- }
- if (info.last_update != last_pg_merge_meta.target_version) {
- dout(10) << __func__ << " target version doesn't match expected "
- << last_pg_merge_meta.target_version << dendl;
- incomplete = true;
- }
- }
-
- PGLogEntryHandler handler{this, rctx->transaction};
- pg_log.roll_forward(&handler);
-
- info.last_complete = info.last_update; // to fake out trim()
- pg_log.reset_recovery_pointers();
- pg_log.trim(info.last_update, info);
-
- vector<PGLog*> log_from;
- for (auto& i : sources) {
- auto& source = i.second;
- if (!source) {
- dout(10) << __func__ << " source " << i.first << " missing" << dendl;
- incomplete = true;
- continue;
- }
- if (source->info.last_complete != source->info.last_update ||
- source->info.is_incomplete() ||
- source->info.dne()) {
- dout(10) << __func__ << " source " << source->pg_id << " incomplete"
- << dendl;
- incomplete = true;
- }
- if (last_pg_merge_meta.source_pgid != pg_t()) {
- if (source->info.pgid.pgid != last_pg_merge_meta.source_pgid) {
- dout(10) << __func__ << " source " << source->info.pgid.pgid
- << " doesn't match expected source pgid "
- << last_pg_merge_meta.source_pgid << dendl;
- incomplete = true;
- }
- if (source->info.last_update != last_pg_merge_meta.source_version) {
- dout(10) << __func__ << " source version doesn't match expected "
- << last_pg_merge_meta.target_version << dendl;
- incomplete = true;
- }
- }
-
- // prepare log
- PGLogEntryHandler handler{source.get(), rctx->transaction};
- source->pg_log.roll_forward(&handler);
- source->info.last_complete = source->info.last_update; // to fake out trim()
- source->pg_log.reset_recovery_pointers();
- source->pg_log.trim(source->info.last_update, source->info);
- log_from.push_back(&source->pg_log);
-
- // wipe out source's pgmeta
- rctx->transaction->remove(source->coll, source->pgmeta_oid);
-
- // merge (and destroy source collection)
- rctx->transaction->merge_collection(source->coll, coll, split_bits);
-
- // combine stats
- info.stats.add(source->info.stats);
-
- // pull up last_update
- info.last_update = std::max(info.last_update, source->info.last_update);
-
- // adopt source's PastIntervals if target has none. we can do this since
- // pgp_num has been reduced prior to the merge, so the OSD mappings for
- // the PGs are identical.
- if (past_intervals.empty() && !source->past_intervals.empty()) {
- dout(10) << __func__ << " taking source's past_intervals" << dendl;
- past_intervals = source->past_intervals;
- }
- }
-
- // merge_collection does this, but maybe all of our sources were missing.
- rctx->transaction->collection_set_bits(coll, split_bits);
-
- info.last_complete = info.last_update;
- info.log_tail = info.last_update;
- if (incomplete) {
- info.last_backfill = hobject_t();
- }
-
- snap_mapper.update_bits(split_bits);
-
- // merge logs
- pg_log.merge_from(log_from, info.last_update);
-
- // make sure we have a meaningful last_epoch_started/clean (if we were a
- // placeholder)
- if (info.last_epoch_started == 0) {
- // start with (a) source's history, since these PGs *should* have been
- // remapped in concert with each other...
- info.history = sources.begin()->second->info.history;
-
- // we use the last_epoch_{started,clean} we got from
- // the caller, which are the epochs that were reported by the PGs were
- // found to be ready for merge.
- info.history.last_epoch_clean = last_pg_merge_meta.last_epoch_clean;
- info.history.last_epoch_started = last_pg_merge_meta.last_epoch_started;
- info.last_epoch_started = last_pg_merge_meta.last_epoch_started;
- dout(10) << __func__
- << " set les/c to " << last_pg_merge_meta.last_epoch_started << "/"
- << last_pg_merge_meta.last_epoch_clean
- << " from pool last_dec_*, source pg history was "
- << sources.begin()->second->info.history
- << dendl;
-
- // if the past_intervals start is later than last_epoch_clean, it
- // implies the source repeered again but the target didn't, or
- // that the source became clean in a later epoch than the target.
- // avoid the discrepancy but adjusting the interval start
- // backwards to match so that check_past_interval_bounds() will
- // not complain.
- auto pib = past_intervals.get_bounds();
- if (info.history.last_epoch_clean < pib.first) {
- dout(10) << __func__ << " last_epoch_clean "
- << info.history.last_epoch_clean << " < past_interval start "
- << pib.first << ", adjusting start backwards" << dendl;
- past_intervals.adjust_start_backwards(info.history.last_epoch_clean);
- }
-
- // Similarly, if the same_interval_since value is later than
- // last_epoch_clean, the next interval change will result in a
- // past_interval start that is later than last_epoch_clean. This
- // can happen if we use the pg_history values from the merge
- // source. Adjust the same_interval_since value backwards if that
- // happens. (We trust the les and lec values more because they came from
- // the real target, whereas the history value we stole from the source.)
- if (info.history.last_epoch_started < info.history.same_interval_since) {
- dout(10) << __func__ << " last_epoch_started "
- << info.history.last_epoch_started << " < same_interval_since "
- << info.history.same_interval_since
- << ", adjusting pg_history backwards" << dendl;
- info.history.same_interval_since = info.history.last_epoch_clean;
- // make sure same_{up,primary}_since are <= same_interval_since
- info.history.same_up_since = std::min(
- info.history.same_up_since, info.history.same_interval_since);
- info.history.same_primary_since = std::min(
- info.history.same_primary_since, info.history.same_interval_since);
- }
- }
-
- dirty_info = true;
- dirty_big_info = true;
-}
-
-void PG::add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end)
-{
- ConnectionRef con = s->con;
- if (!con) // OSD::ms_handle_reset clears s->con without a lock
- return;
- BackoffRef b(s->have_backoff(info.pgid, begin));
- if (b) {
- derr << __func__ << " already have backoff for " << s << " begin " << begin
- << " " << *b << dendl;
- ceph_abort();
- }
- std::lock_guard l(backoff_lock);
- {
- b = new Backoff(info.pgid, this, s, ++s->backoff_seq, begin, end);
- backoffs[begin].insert(b);
- s->add_backoff(b);
- dout(10) << __func__ << " session " << s << " added " << *b << dendl;
- }
- con->send_message(
- new MOSDBackoff(
- info.pgid,
- get_osdmap_epoch(),
- CEPH_OSD_BACKOFF_OP_BLOCK,
- b->id,
- begin,
- end));
-}
-
-void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
-{
- dout(10) << __func__ << " [" << begin << "," << end << ")" << dendl;
- vector<BackoffRef> bv;
- {
- std::lock_guard l(backoff_lock);
- auto p = backoffs.lower_bound(begin);
- while (p != backoffs.end()) {
- int r = cmp(p->first, end);
- dout(20) << __func__ << " ? " << r << " " << p->first
- << " " << p->second << dendl;
- // note: must still examine begin=end=p->first case
- if (r > 0 || (r == 0 && begin < end)) {
- break;
- }
- dout(20) << __func__ << " checking " << p->first
- << " " << p->second << dendl;
- auto q = p->second.begin();
- while (q != p->second.end()) {
- dout(20) << __func__ << " checking " << *q << dendl;
- int r = cmp((*q)->begin, begin);
- if (r == 0 || (r > 0 && (*q)->end < end)) {
- bv.push_back(*q);
- q = p->second.erase(q);
- } else {
- ++q;
- }
- }
- if (p->second.empty()) {
- p = backoffs.erase(p);
- } else {
- ++p;
- }
- }
- }
- for (auto b : bv) {
- std::lock_guard l(b->lock);
- dout(10) << __func__ << " " << *b << dendl;
- if (b->session) {
- ceph_assert(b->pg == this);
- ConnectionRef con = b->session->con;
- if (con) { // OSD::ms_handle_reset clears s->con without a lock
- con->send_message(
- new MOSDBackoff(
- info.pgid,
- get_osdmap_epoch(),
- CEPH_OSD_BACKOFF_OP_UNBLOCK,
- b->id,
- b->begin,
- b->end));
- }
- if (b->is_new()) {
- b->state = Backoff::STATE_DELETING;
- } else {
- b->session->rm_backoff(b);
- b->session.reset();
- }
- b->pg.reset();
- }
- }
-}
-
-void PG::clear_backoffs()
-{
- dout(10) << __func__ << " " << dendl;
- map<hobject_t,set<BackoffRef>> ls;
- {
- std::lock_guard l(backoff_lock);
- ls.swap(backoffs);
- }
- for (auto& p : ls) {
- for (auto& b : p.second) {
- std::lock_guard l(b->lock);
- dout(10) << __func__ << " " << *b << dendl;
- if (b->session) {
- ceph_assert(b->pg == this);
- if (b->is_new()) {
- b->state = Backoff::STATE_DELETING;
- } else {
- b->session->rm_backoff(b);
- b->session.reset();
- }
- b->pg.reset();
- }
- }
- }
-}
-
-// called by Session::clear_backoffs()
-void PG::rm_backoff(BackoffRef b)
-{
- dout(10) << __func__ << " " << *b << dendl;
- std::lock_guard l(backoff_lock);
- ceph_assert(b->lock.is_locked_by_me());
- ceph_assert(b->pg == this);
- auto p = backoffs.find(b->begin);
- // may race with release_backoffs()
- if (p != backoffs.end()) {
- auto q = p->second.find(b);
- if (q != p->second.end()) {
- p->second.erase(q);
- if (p->second.empty()) {
- backoffs.erase(p);
- }
- }
- }
-}
-
-void PG::clear_recovery_state()
-{
- dout(10) << "clear_recovery_state" << dendl;
-
- pg_log.reset_recovery_pointers();
- finish_sync_event = 0;
-
- hobject_t soid;
- while (recovery_ops_active > 0) {
-#ifdef DEBUG_RECOVERY_OIDS
- soid = *recovering_oids.begin();
-#endif
- finish_recovery_op(soid, true);
- }
-
- async_recovery_targets.clear();
- backfill_targets.clear();
- backfill_info.clear();
- peer_backfill_info.clear();
- waiting_on_backfill.clear();
- _clear_recovery_state(); // pg impl specific hook
-}
-
-void PG::cancel_recovery()
-{
- dout(10) << "cancel_recovery" << dendl;
- clear_recovery_state();
-}
-
-
-void PG::purge_strays()
-{
- if (is_premerge()) {
- dout(10) << "purge_strays " << stray_set << " but premerge, doing nothing"
- << dendl;
- return;
- }
- if (cct->_conf.get_val<bool>("osd_debug_no_purge_strays")) {
- return;
- }
- dout(10) << "purge_strays " << stray_set << dendl;
-
- bool removed = false;
- for (set<pg_shard_t>::iterator p = stray_set.begin();
- p != stray_set.end();
- ++p) {
- ceph_assert(!is_acting_recovery_backfill(*p));
- if (get_osdmap()->is_up(p->osd)) {
- dout(10) << "sending PGRemove to osd." << *p << dendl;
- vector<spg_t> to_remove;
- to_remove.push_back(spg_t(info.pgid.pgid, p->shard));
- MOSDPGRemove *m = new MOSDPGRemove(
- get_osdmap_epoch(),
- to_remove);
- osd->send_message_osd_cluster(p->osd, m, get_osdmap_epoch());
- } else {
- dout(10) << "not sending PGRemove to down osd." << *p << dendl;
- }
- peer_missing.erase(*p);
- peer_info.erase(*p);
- peer_purged.insert(*p);
- removed = true;
- }
-
- // if we removed anyone, update peers (which include peer_info)
- if (removed)
- update_heartbeat_peers();
-
- stray_set.clear();
-
- // clear _requested maps; we may have to peer() again if we discover
- // (more) stray content
- peer_log_requested.clear();
- peer_missing_requested.clear();
-}
-
-void PG::set_probe_targets(const set<pg_shard_t> &probe_set)
-{
- std::lock_guard l(heartbeat_peer_lock);
- probe_targets.clear();
- for (set<pg_shard_t>::iterator i = probe_set.begin();
- i != probe_set.end();
- ++i) {
- probe_targets.insert(i->osd);
- }
-}
-
-void PG::clear_probe_targets()
-{
- std::lock_guard l(heartbeat_peer_lock);
- probe_targets.clear();
-}
-
-void PG::update_heartbeat_peers()
-{
- ceph_assert(is_locked());
-
- if (!is_primary())
- return;
-
- set<int> new_peers;
- for (unsigned i=0; i<acting.size(); i++) {
- if (acting[i] != CRUSH_ITEM_NONE)
- new_peers.insert(acting[i]);
- }
- for (unsigned i=0; i<up.size(); i++) {
- if (up[i] != CRUSH_ITEM_NONE)
- new_peers.insert(up[i]);
- }
- for (map<pg_shard_t,pg_info_t>::iterator p = peer_info.begin();
- p != peer_info.end();
- ++p)
- new_peers.insert(p->first.osd);
-
- bool need_update = false;
- heartbeat_peer_lock.Lock();
- if (new_peers == heartbeat_peers) {
- dout(10) << "update_heartbeat_peers " << heartbeat_peers << " unchanged" << dendl;
- } else {
- dout(10) << "update_heartbeat_peers " << heartbeat_peers << " -> " << new_peers << dendl;
- heartbeat_peers.swap(new_peers);
- need_update = true;
- }
- heartbeat_peer_lock.Unlock();
-
- if (need_update)
- osd->need_heartbeat_peer_update();
-}
-
-
-bool PG::check_in_progress_op(
- const osd_reqid_t &r,
- eversion_t *version,
- version_t *user_version,
- int *return_code) const
-{
- return (
- projected_log.get_request(r, version, user_version, return_code) ||
- pg_log.get_log().get_request(r, version, user_version, return_code));
-}
-
-static bool find_shard(const set<pg_shard_t> & pgs, shard_id_t shard)
-{
- for (auto&p : pgs)
- if (p.shard == shard)
- return true;
- return false;
-}
-
-static pg_shard_t get_another_shard(const set<pg_shard_t> & pgs, pg_shard_t skip, shard_id_t shard)
-{
- for (auto&p : pgs) {
- if (p == skip)
- continue;
- if (p.shard == shard)
- return p;
- }
- return pg_shard_t();
-}
-
-void PG::_update_calc_stats()
-{
- info.stats.version = info.last_update;
- info.stats.created = info.history.epoch_created;
- info.stats.last_scrub = info.history.last_scrub;
- info.stats.last_scrub_stamp = info.history.last_scrub_stamp;
- info.stats.last_deep_scrub = info.history.last_deep_scrub;
- info.stats.last_deep_scrub_stamp = info.history.last_deep_scrub_stamp;
- info.stats.last_clean_scrub_stamp = info.history.last_clean_scrub_stamp;
- info.stats.last_epoch_clean = info.history.last_epoch_clean;
-
- info.stats.log_size = pg_log.get_head().version - pg_log.get_tail().version;
- info.stats.ondisk_log_size = info.stats.log_size;
- info.stats.log_start = pg_log.get_tail();
- info.stats.ondisk_log_start = pg_log.get_tail();
- info.stats.snaptrimq_len = snap_trimq.size();
-
- unsigned num_shards = get_osdmap()->get_pg_size(info.pgid.pgid);
-
- // In rare case that upset is too large (usually transient), use as target
- // for calculations below.
- unsigned target = std::max(num_shards, (unsigned)upset.size());
- // For undersized actingset may be larger with OSDs out
- unsigned nrep = std::max(actingset.size(), upset.size());
- // calc num_object_copies
- info.stats.stats.calc_copies(std::max(target, nrep));
- info.stats.stats.sum.num_objects_degraded = 0;
- info.stats.stats.sum.num_objects_unfound = 0;
- info.stats.stats.sum.num_objects_misplaced = 0;
-
- if ((is_remapped() || is_undersized() || !is_clean()) && (is_peered() || is_activating())) {
- dout(20) << __func__ << " actingset " << actingset << " upset "
- << upset << " acting_recovery_backfill " << acting_recovery_backfill << dendl;
- dout(20) << __func__ << " acting " << acting << " up " << up << dendl;
-
- ceph_assert(!acting_recovery_backfill.empty());
-
- bool estimate = false;
-
- // NOTE: we only generate degraded, misplaced and unfound
- // values for the summation, not individual stat categories.
- int64_t num_objects = info.stats.stats.sum.num_objects;
-
- // Objects missing from up nodes, sorted by # objects.
- boost::container::flat_set<pair<int64_t,pg_shard_t>> missing_target_objects;
- // Objects missing from nodes not in up, sort by # objects
- boost::container::flat_set<pair<int64_t,pg_shard_t>> acting_source_objects;
-
- // Fill missing_target_objects/acting_source_objects
-
- {
- int64_t missing;
-
- // Primary first
- missing = pg_log.get_missing().num_missing();
- ceph_assert(acting_recovery_backfill.count(pg_whoami));
- if (upset.count(pg_whoami)) {
- missing_target_objects.insert(make_pair(missing, pg_whoami));
- } else {
- acting_source_objects.insert(make_pair(missing, pg_whoami));
- }
- info.stats.stats.sum.num_objects_missing_on_primary = missing;
- dout(20) << __func__ << " shard " << pg_whoami
- << " primary objects " << num_objects
- << " missing " << missing
- << dendl;
- }
-
- // All other peers
- for (auto& peer : peer_info) {
- // Primary should not be in the peer_info, skip if it is.
- if (peer.first == pg_whoami) continue;
- int64_t missing = 0;
- int64_t peer_num_objects = peer.second.stats.stats.sum.num_objects;
- // Backfill targets always track num_objects accurately
- // all other peers track missing accurately.
- if (is_backfill_targets(peer.first)) {
- missing = std::max((int64_t)0, num_objects - peer_num_objects);
- } else {
- if (peer_missing.count(peer.first)) {
- missing = peer_missing[peer.first].num_missing();
- } else {
- dout(20) << __func__ << " no peer_missing found for " << peer.first << dendl;
- if (is_recovering()) {
- estimate = true;
- }
- missing = std::max((int64_t)0, num_objects - peer_num_objects);
- }
- }
- if (upset.count(peer.first)) {
- missing_target_objects.insert(make_pair(missing, peer.first));
- } else if (actingset.count(peer.first)) {
- acting_source_objects.insert(make_pair(missing, peer.first));
- }
- peer.second.stats.stats.sum.num_objects_missing = missing;
- dout(20) << __func__ << " shard " << peer.first
- << " objects " << peer_num_objects
- << " missing " << missing
- << dendl;
- }
-
- // A misplaced object is not stored on the correct OSD
- int64_t misplaced = 0;
- // a degraded objects has fewer replicas or EC shards than the pool specifies.
- int64_t degraded = 0;
-
- if (is_recovering()) {
- for (auto& sml: missing_loc.get_missing_by_count()) {
- for (auto& ml: sml.second) {
- int missing_shards;
- if (sml.first == shard_id_t::NO_SHARD) {
- dout(20) << __func__ << " ml " << ml.second << " upset size " << upset.size() << " up " << ml.first.up << dendl;
- missing_shards = (int)upset.size() - ml.first.up;
- } else {
- // Handle shards not even in upset below
- if (!find_shard(upset, sml.first))
- continue;
- missing_shards = std::max(0, 1 - ml.first.up);
- dout(20) << __func__ << " shard " << sml.first << " ml " << ml.second << " missing shards " << missing_shards << dendl;
- }
- int odegraded = ml.second * missing_shards;
- // Copies on other osds but limited to the possible degraded
- int more_osds = std::min(missing_shards, ml.first.other);
- int omisplaced = ml.second * more_osds;
- ceph_assert(omisplaced <= odegraded);
- odegraded -= omisplaced;
-
- misplaced += omisplaced;
- degraded += odegraded;
- }
- }
-
- dout(20) << __func__ << " missing based degraded " << degraded << dendl;
- dout(20) << __func__ << " missing based misplaced " << misplaced << dendl;
-
- // Handle undersized case
- if (pool.info.is_replicated()) {
- // Add degraded for missing targets (num_objects missing)
- ceph_assert(target >= upset.size());
- unsigned needed = target - upset.size();
- degraded += num_objects * needed;
- } else {
- for (unsigned i = 0 ; i < num_shards; ++i) {
- shard_id_t shard(i);
-
- if (!find_shard(upset, shard)) {
- pg_shard_t pgs = get_another_shard(actingset, pg_shard_t(), shard);
-
- if (pgs != pg_shard_t()) {
- int64_t missing;
-
- if (pgs == pg_whoami)
- missing = info.stats.stats.sum.num_objects_missing_on_primary;
- else
- missing = peer_info[pgs].stats.stats.sum.num_objects_missing;
-
- degraded += missing;
- misplaced += std::max((int64_t)0, num_objects - missing);
- } else {
- // No shard anywhere
- degraded += num_objects;
- }
- }
- }
- }
- goto out;
- }
-
- // Handle undersized case
- if (pool.info.is_replicated()) {
- // Add to missing_target_objects
- ceph_assert(target >= missing_target_objects.size());
- unsigned needed = target - missing_target_objects.size();
- if (needed)
- missing_target_objects.insert(make_pair(num_objects * needed, pg_shard_t(pg_shard_t::NO_OSD)));
- } else {
- for (unsigned i = 0 ; i < num_shards; ++i) {
- shard_id_t shard(i);
- bool found = false;
- for (const auto& t : missing_target_objects) {
- if (std::get<1>(t).shard == shard) {
- found = true;
- break;
- }
- }
- if (!found)
- missing_target_objects.insert(make_pair(num_objects, pg_shard_t(pg_shard_t::NO_OSD,shard)));
- }
- }
-
- for (const auto& item : missing_target_objects)
- dout(20) << __func__ << " missing shard " << std::get<1>(item) << " missing= " << std::get<0>(item) << dendl;
- for (const auto& item : acting_source_objects)
- dout(20) << __func__ << " acting shard " << std::get<1>(item) << " missing= " << std::get<0>(item) << dendl;
-
- // Handle all objects not in missing for remapped
- // or backfill
- for (auto m = missing_target_objects.rbegin();
- m != missing_target_objects.rend(); ++m) {
-
- int64_t extra_missing = -1;
-
- if (pool.info.is_replicated()) {
- if (!acting_source_objects.empty()) {
- auto extra_copy = acting_source_objects.begin();
- extra_missing = std::get<0>(*extra_copy);
- acting_source_objects.erase(extra_copy);
- }
- } else { // Erasure coded
- // Use corresponding shard
- for (const auto& a : acting_source_objects) {
- if (std::get<1>(a).shard == std::get<1>(*m).shard) {
- extra_missing = std::get<0>(a);
- acting_source_objects.erase(a);
- break;
- }
- }
- }
-
- if (extra_missing >= 0 && std::get<0>(*m) >= extra_missing) {
- // We don't know which of the objects on the target
- // are part of extra_missing so assume are all degraded.
- misplaced += std::get<0>(*m) - extra_missing;
- degraded += extra_missing;
- } else {
- // 1. extra_missing == -1, more targets than sources so degraded
- // 2. extra_missing > std::get<0>(m), so that we know that some extra_missing
- // previously degraded are now present on the target.
- degraded += std::get<0>(*m);
- }
- }
- // If there are still acting that haven't been accounted for
- // then they are misplaced
- for (const auto& a : acting_source_objects) {
- int64_t extra_misplaced = std::max((int64_t)0, num_objects - std::get<0>(a));
- dout(20) << __func__ << " extra acting misplaced " << extra_misplaced << dendl;
- misplaced += extra_misplaced;
- }
-out:
- // NOTE: Tests use these messages to verify this code
- dout(20) << __func__ << " degraded " << degraded << (estimate ? " (est)": "") << dendl;
- dout(20) << __func__ << " misplaced " << misplaced << (estimate ? " (est)": "")<< dendl;
-
- info.stats.stats.sum.num_objects_degraded = degraded;
- info.stats.stats.sum.num_objects_unfound = get_num_unfound();
- info.stats.stats.sum.num_objects_misplaced = misplaced;
- }
-}
-
-void PG::_update_blocked_by()
-{
- // set a max on the number of blocking peers we report. if we go
- // over, report a random subset. keep the result sorted.
- unsigned keep = std::min<unsigned>(blocked_by.size(), cct->_conf->osd_max_pg_blocked_by);
- unsigned skip = blocked_by.size() - keep;
- info.stats.blocked_by.clear();
- info.stats.blocked_by.resize(keep);
- unsigned pos = 0;
- for (set<int>::iterator p = blocked_by.begin();
- p != blocked_by.end() && keep > 0;
- ++p) {
- if (skip > 0 && (rand() % (skip + keep) < skip)) {
- --skip;
- } else {
- info.stats.blocked_by[pos++] = *p;
- --keep;
- }
- }
-}
-
-void PG::publish_stats_to_osd()
-{
- if (!is_primary())
- return;
-
- pg_stats_publish_lock.Lock();
-
- if (info.stats.stats.sum.num_scrub_errors)
- state_set(PG_STATE_INCONSISTENT);
- else {
- state_clear(PG_STATE_INCONSISTENT);
- state_clear(PG_STATE_FAILED_REPAIR);
- }
-
- utime_t now = ceph_clock_now();
- if (info.stats.state != state) {
- info.stats.last_change = now;
- // Optimistic estimation, if we just find out an inactive PG,
- // assumt it is active till now.
- if (!(state & PG_STATE_ACTIVE) &&
- (info.stats.state & PG_STATE_ACTIVE))
- info.stats.last_active = now;
-
- if ((state & PG_STATE_ACTIVE) &&
- !(info.stats.state & PG_STATE_ACTIVE))
- info.stats.last_became_active = now;
- if ((state & (PG_STATE_ACTIVE|PG_STATE_PEERED)) &&
- !(info.stats.state & (PG_STATE_ACTIVE|PG_STATE_PEERED)))
- info.stats.last_became_peered = now;
- info.stats.state = state;
- }
-
- _update_calc_stats();
- if (info.stats.stats.sum.num_objects_degraded) {
- state_set(PG_STATE_DEGRADED);
- } else {
- state_clear(PG_STATE_DEGRADED);
- }
- _update_blocked_by();
-
- pg_stat_t pre_publish = info.stats;
- pre_publish.stats.add(unstable_stats);
- utime_t cutoff = now;
- cutoff -= cct->_conf->osd_pg_stat_report_interval_max;
-
- if (get_osdmap()->require_osd_release >= CEPH_RELEASE_MIMIC) {
- // share (some of) our purged_snaps via the pg_stats. limit # of intervals
- // because we don't want to make the pg_stat_t structures too expensive.
- unsigned max = cct->_conf->osd_max_snap_prune_intervals_per_epoch;
- unsigned num = 0;
- auto i = info.purged_snaps.begin();
- while (num < max && i != info.purged_snaps.end()) {
- pre_publish.purged_snaps.insert(i.get_start(), i.get_len());
- ++num;
- ++i;
- }
- dout(20) << __func__ << " reporting purged_snaps "
- << pre_publish.purged_snaps << dendl;
- }
-
- if (pg_stats_publish_valid && pre_publish == pg_stats_publish &&
- info.stats.last_fresh > cutoff) {
- dout(15) << "publish_stats_to_osd " << pg_stats_publish.reported_epoch
- << ": no change since " << info.stats.last_fresh << dendl;
- } else {
- // update our stat summary and timestamps
- info.stats.reported_epoch = get_osdmap_epoch();
- ++info.stats.reported_seq;
-
- info.stats.last_fresh = now;
-
- if (info.stats.state & PG_STATE_CLEAN)
- info.stats.last_clean = now;
- if (info.stats.state & PG_STATE_ACTIVE)
- info.stats.last_active = now;
- if (info.stats.state & (PG_STATE_ACTIVE|PG_STATE_PEERED))
- info.stats.last_peered = now;
- info.stats.last_unstale = now;
- if ((info.stats.state & PG_STATE_DEGRADED) == 0)
- info.stats.last_undegraded = now;
- if ((info.stats.state & PG_STATE_UNDERSIZED) == 0)
- info.stats.last_fullsized = now;
-
- pg_stats_publish_valid = true;
- pg_stats_publish = pre_publish;
-
- dout(15) << "publish_stats_to_osd " << pg_stats_publish.reported_epoch
- << ":" << pg_stats_publish.reported_seq << dendl;
- }
- pg_stats_publish_lock.Unlock();
-}
-
-void PG::clear_publish_stats()
-{
- dout(15) << "clear_stats" << dendl;
- pg_stats_publish_lock.Lock();
- pg_stats_publish_valid = false;
- pg_stats_publish_lock.Unlock();
-}
-
-/**
- * initialize a newly instantiated pg
- *
- * Initialize PG state, as when a PG is initially created, or when it
- * is first instantiated on the current node.
- *
- * @param role our role/rank
- * @param newup up set
- * @param newacting acting set
- * @param history pg history
- * @param pi past_intervals
- * @param backfill true if info should be marked as backfill
- * @param t transaction to write out our new state in
- */
-void PG::init(
- int role,
- const vector<int>& newup, int new_up_primary,
- const vector<int>& newacting, int new_acting_primary,
- const pg_history_t& history,
- const PastIntervals& pi,
- bool backfill,
- ObjectStore::Transaction *t)
-{
- dout(10) << "init role " << role << " up " << newup << " acting " << newacting
- << " history " << history
- << " past_intervals " << pi
- << dendl;
-
- set_role(role);
- init_primary_up_acting(
- newup,
- newacting,
- new_up_primary,
- new_acting_primary);
-
- info.history = history;
- past_intervals = pi;
-
- info.stats.up = up;
- info.stats.up_primary = new_up_primary;
- info.stats.acting = acting;
- info.stats.acting_primary = new_acting_primary;
- info.stats.mapping_epoch = info.history.same_interval_since;
-
- if (backfill) {
- dout(10) << __func__ << ": Setting backfill" << dendl;
- info.set_last_backfill(hobject_t());
- info.last_complete = info.last_update;
- pg_log.mark_log_for_rewrite();
- }
-
- on_new_interval();
-
- dirty_info = true;
- dirty_big_info = true;
- write_if_dirty(*t);
-}
-
-void PG::shutdown()
-{
- ch->flush();
- lock();
- on_shutdown();
- unlock();
-}
-
-#pragma GCC diagnostic ignored "-Wpragmas"
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
-
-void PG::upgrade(ObjectStore *store)
-{
- dout(0) << __func__ << " " << info_struct_v << " -> " << latest_struct_v
- << dendl;
- ceph_assert(info_struct_v <= 10);
- ObjectStore::Transaction t;
-
- // <do upgrade steps here>
-
- // finished upgrade!
- ceph_assert(info_struct_v == 10);
-
- // update infover_key
- if (info_struct_v < latest_struct_v) {
- map<string,bufferlist> v;
- __u8 ver = latest_struct_v;
- encode(ver, v[infover_key]);
- t.omap_setkeys(coll, pgmeta_oid, v);
- }
-
- dirty_info = true;
- dirty_big_info = true;
- write_if_dirty(t);
-
- ObjectStore::CollectionHandle ch = store->open_collection(coll);
- int r = store->queue_transaction(ch, std::move(t));
- if (r != 0) {
- derr << __func__ << ": queue_transaction returned "
- << cpp_strerror(r) << dendl;
- ceph_abort();
- }
- ceph_assert(r == 0);
-
- C_SaferCond waiter;
- if (!ch->flush_commit(&waiter)) {
- waiter.wait();
- }
-}
-
-#pragma GCC diagnostic pop
-#pragma GCC diagnostic warning "-Wpragmas"
-
-int PG::_prepare_write_info(CephContext* cct,
- map<string,bufferlist> *km,
- epoch_t epoch,
- pg_info_t &info, pg_info_t &last_written_info,
- PastIntervals &past_intervals,
- bool dirty_big_info,
- bool dirty_epoch,
- bool try_fast_info,
- PerfCounters *logger)
-{
- if (dirty_epoch) {
- encode(epoch, (*km)[epoch_key]);
- }
-
- if (logger)
- logger->inc(l_osd_pg_info);
-
- // try to do info efficiently?
- if (!dirty_big_info && try_fast_info &&
- info.last_update > last_written_info.last_update) {
- pg_fast_info_t fast;
- fast.populate_from(info);
- bool did = fast.try_apply_to(&last_written_info);
- ceph_assert(did); // we verified last_update increased above
- if (info == last_written_info) {
- encode(fast, (*km)[fastinfo_key]);
- if (logger)
- logger->inc(l_osd_pg_fastinfo);
- return 0;
- }
- generic_dout(30) << __func__ << " fastinfo failed, info:\n";
- {
- JSONFormatter jf(true);
- jf.dump_object("info", info);
- jf.flush(*_dout);
- }
- {
- *_dout << "\nlast_written_info:\n";
- JSONFormatter jf(true);
- jf.dump_object("last_written_info", last_written_info);
- jf.flush(*_dout);
- }
- *_dout << dendl;
- }
- last_written_info = info;
-
- // info. store purged_snaps separately.
- interval_set<snapid_t> purged_snaps;
- purged_snaps.swap(info.purged_snaps);
- encode(info, (*km)[info_key]);
- purged_snaps.swap(info.purged_snaps);
-
- if (dirty_big_info) {
- // potentially big stuff
- bufferlist& bigbl = (*km)[biginfo_key];
- encode(past_intervals, bigbl);
- encode(info.purged_snaps, bigbl);
- //dout(20) << "write_info bigbl " << bigbl.length() << dendl;
- if (logger)
- logger->inc(l_osd_pg_biginfo);
- }
-
- return 0;
-}
-
-void PG::_create(ObjectStore::Transaction& t, spg_t pgid, int bits)
-{
- coll_t coll(pgid);
- t.create_collection(coll, bits);
-}
-
-void PG::_init(ObjectStore::Transaction& t, spg_t pgid, const pg_pool_t *pool)
-{
- coll_t coll(pgid);
-
- if (pool) {
- // Give a hint to the PG collection
- bufferlist hint;
- uint32_t pg_num = pool->get_pg_num();
- uint64_t expected_num_objects_pg = pool->expected_num_objects / pg_num;
- encode(pg_num, hint);
- encode(expected_num_objects_pg, hint);
- uint32_t hint_type = ObjectStore::Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS;
- t.collection_hint(coll, hint_type, hint);
- }
-
- ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
- t.touch(coll, pgmeta_oid);
- map<string,bufferlist> values;
- __u8 struct_v = latest_struct_v;
- encode(struct_v, values[infover_key]);
- t.omap_setkeys(coll, pgmeta_oid, values);
-}
-
-void PG::prepare_write_info(map<string,bufferlist> *km)
-{
- info.stats.stats.add(unstable_stats);
- unstable_stats.clear();
-
- bool need_update_epoch = last_epoch < get_osdmap_epoch();
- int ret = _prepare_write_info(cct, km, get_osdmap_epoch(),
- info,
- last_written_info,
- past_intervals,
- dirty_big_info, need_update_epoch,
- cct->_conf->osd_fast_info,
- osd->logger);
- ceph_assert(ret == 0);
- if (need_update_epoch)
- last_epoch = get_osdmap_epoch();
- last_persisted_osdmap = last_epoch;
-
- dirty_info = false;
- dirty_big_info = false;
-}
-
-#pragma GCC diagnostic ignored "-Wpragmas"
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
-
-bool PG::_has_removal_flag(ObjectStore *store,
- spg_t pgid)
-{
- coll_t coll(pgid);
- ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
-
- // first try new way
- set<string> keys;
- keys.insert("_remove");
- map<string,bufferlist> values;
- auto ch = store->open_collection(coll);
- ceph_assert(ch);
- if (store->omap_get_values(ch, pgmeta_oid, keys, &values) == 0 &&
- values.size() == 1)
- return true;
-
- return false;
-}
-
-int PG::peek_map_epoch(ObjectStore *store,
- spg_t pgid,
- epoch_t *pepoch)
-{
- coll_t coll(pgid);
- ghobject_t legacy_infos_oid(OSD::make_infos_oid());
- ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
- epoch_t cur_epoch = 0;
-
- // validate collection name
- ceph_assert(coll.is_pg());
-
- // try for v8
- set<string> keys;
- keys.insert(infover_key);
- keys.insert(epoch_key);
- map<string,bufferlist> values;
- auto ch = store->open_collection(coll);
- ceph_assert(ch);
- int r = store->omap_get_values(ch, pgmeta_oid, keys, &values);
- if (r == 0) {
- ceph_assert(values.size() == 2);
-
- // sanity check version
- auto bp = values[infover_key].cbegin();
- __u8 struct_v = 0;
- decode(struct_v, bp);
- ceph_assert(struct_v >= 8);
-
- // get epoch
- bp = values[epoch_key].begin();
- decode(cur_epoch, bp);
- } else {
- // probably bug 10617; see OSD::load_pgs()
- return -1;
- }
-
- *pepoch = cur_epoch;
- return 0;
-}
-
-#pragma GCC diagnostic pop
-#pragma GCC diagnostic warning "-Wpragmas"
-
-void PG::write_if_dirty(ObjectStore::Transaction& t)
-{
- map<string,bufferlist> km;
- if (dirty_big_info || dirty_info)
- prepare_write_info(&km);
- pg_log.write_log_and_missing(t, &km, coll, pgmeta_oid, pool.info.require_rollback());
- if (!km.empty())
- t.omap_setkeys(coll, pgmeta_oid, km);
-}
-
-void PG::add_log_entry(const pg_log_entry_t& e, bool applied)
-{
- // raise last_complete only if we were previously up to date
- if (info.last_complete == info.last_update)
- info.last_complete = e.version;
-
- // raise last_update.
- ceph_assert(e.version > info.last_update);
- info.last_update = e.version;
-
- // raise user_version, if it increased (it may have not get bumped
- // by all logged updates)
- if (e.user_version > info.last_user_version)
- info.last_user_version = e.user_version;
-
- // log mutation
- pg_log.add(e, applied);
- dout(10) << "add_log_entry " << e << dendl;
-}
-
-
-void PG::append_log(
- const vector<pg_log_entry_t>& logv,
- eversion_t trim_to,
- eversion_t roll_forward_to,
- ObjectStore::Transaction &t,
- bool transaction_applied,
- bool async)
-{
- if (transaction_applied)
- update_snap_map(logv, t);
-
- /* The primary has sent an info updating the history, but it may not
- * have arrived yet. We want to make sure that we cannot remember this
- * write without remembering that it happened in an interval which went
- * active in epoch history.last_epoch_started.
- */
- if (info.last_epoch_started != info.history.last_epoch_started) {
- info.history.last_epoch_started = info.last_epoch_started;
- }
- if (info.last_interval_started != info.history.last_interval_started) {
- info.history.last_interval_started = info.last_interval_started;
- }
- dout(10) << "append_log " << pg_log.get_log() << " " << logv << dendl;
-
- PGLogEntryHandler handler{this, &t};
- if (!transaction_applied) {
- /* We must be a backfill peer, so it's ok if we apply
- * out-of-turn since we won't be considered when
- * determining a min possible last_update.
- */
- pg_log.roll_forward(&handler);
- }
-
- for (vector<pg_log_entry_t>::const_iterator p = logv.begin();
- p != logv.end();
- ++p) {
- add_log_entry(*p, transaction_applied);
-
- /* We don't want to leave the rollforward artifacts around
- * here past last_backfill. It's ok for the same reason as
- * above */
- if (transaction_applied &&
- p->soid > info.last_backfill) {
- pg_log.roll_forward(&handler);
- }
- }
- auto last = logv.rbegin();
- if (is_primary() && last != logv.rend()) {
- projected_log.skip_can_rollback_to_to_head();
- projected_log.trim(cct, last->version, nullptr, nullptr, nullptr);
- }
-
- if (transaction_applied && roll_forward_to > pg_log.get_can_rollback_to()) {
- pg_log.roll_forward_to(
- roll_forward_to,
- &handler);
- last_rollback_info_trimmed_to_applied = roll_forward_to;
- }
-
- dout(10) << __func__ << " approx pg log length = "
- << pg_log.get_log().approx_size() << dendl;
- dout(10) << __func__ << " transaction_applied = "
- << transaction_applied << dendl;
- if (!transaction_applied || async)
- dout(10) << __func__ << " " << pg_whoami
- << " is async_recovery or backfill target" << dendl;
- pg_log.trim(trim_to, info, transaction_applied, async);
-
- // update the local pg, pg log
- dirty_info = true;
- write_if_dirty(t);
-}
-
-bool PG::check_log_for_corruption(ObjectStore *store)
-{
- /// TODO: this method needs to work with the omap log
- return true;
-}
-
-//! Get the name we're going to save our corrupt page log as
-std::string PG::get_corrupt_pg_log_name() const
-{
- const int MAX_BUF = 512;
- char buf[MAX_BUF];
- struct tm tm_buf;
- time_t my_time(time(NULL));
- const struct tm *t = localtime_r(&my_time, &tm_buf);
- int ret = strftime(buf, sizeof(buf), "corrupt_log_%Y-%m-%d_%k:%M_", t);
- if (ret == 0) {
- dout(0) << "strftime failed" << dendl;
- return "corrupt_log_unknown_time";
- }
- string out(buf);
- out += stringify(info.pgid);
- return out;
-}
-
-int PG::read_info(
- ObjectStore *store, spg_t pgid, const coll_t &coll,
- pg_info_t &info, PastIntervals &past_intervals,
- __u8 &struct_v)
-{
- set<string> keys;
- keys.insert(infover_key);
- keys.insert(info_key);
- keys.insert(biginfo_key);
- keys.insert(fastinfo_key);
- ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
- map<string,bufferlist> values;
- auto ch = store->open_collection(coll);
- ceph_assert(ch);
- int r = store->omap_get_values(ch, pgmeta_oid, keys, &values);
- ceph_assert(r == 0);
- ceph_assert(values.size() == 3 ||
- values.size() == 4);
-
- auto p = values[infover_key].cbegin();
- decode(struct_v, p);
- ceph_assert(struct_v >= 10);
-
- p = values[info_key].begin();
- decode(info, p);
-
- p = values[biginfo_key].begin();
- decode(past_intervals, p);
- decode(info.purged_snaps, p);
-
- p = values[fastinfo_key].begin();
- if (!p.end()) {
- pg_fast_info_t fast;
- decode(fast, p);
- fast.try_apply_to(&info);
- }
- return 0;
-}
-
-void PG::read_state(ObjectStore *store)
-{
- int r = read_info(store, pg_id, coll, info, past_intervals,
- info_struct_v);
- ceph_assert(r >= 0);
-
- if (info_struct_v < compat_struct_v) {
- derr << "PG needs upgrade, but on-disk data is too old; upgrade to"
- << " an older version first." << dendl;
- ceph_abort_msg("PG too old to upgrade");
- }
-
- last_written_info = info;
-
- ostringstream oss;
- pg_log.read_log_and_missing(
- store,
- ch,
- pgmeta_oid,
- info,
- oss,
- cct->_conf->osd_ignore_stale_divergent_priors,
- cct->_conf->osd_debug_verify_missing_on_start);
- if (oss.tellp())
- osd->clog->error() << oss.str();
-
- // log any weirdness
- log_weirdness();
-
- if (info_struct_v < latest_struct_v) {
- upgrade(store);
- }
-
- // initialize current mapping
- {
- int primary, up_primary;
- vector<int> acting, up;
- get_osdmap()->pg_to_up_acting_osds(
- pg_id.pgid, &up, &up_primary, &acting, &primary);
- init_primary_up_acting(
- up,
- acting,
- up_primary,
- primary);
- int rr = OSDMap::calc_pg_role(osd->whoami, acting);
- if (pool.info.is_replicated() || rr == pg_whoami.shard)
- set_role(rr);
- else
- set_role(-1);
- }
-
- PG::RecoveryCtx rctx(0, 0, 0, new ObjectStore::Transaction);
- handle_initialize(&rctx);
- // note: we don't activate here because we know the OSD will advance maps
- // during boot.
- write_if_dirty(*rctx.transaction);
- store->queue_transaction(ch, std::move(*rctx.transaction));
- delete rctx.transaction;
-}
-
-void PG::log_weirdness()
-{
- if (pg_log.get_tail() != info.log_tail)
- osd->clog->error() << info.pgid
- << " info mismatch, log.tail " << pg_log.get_tail()
- << " != info.log_tail " << info.log_tail;
- if (pg_log.get_head() != info.last_update)
- osd->clog->error() << info.pgid
- << " info mismatch, log.head " << pg_log.get_head()
- << " != info.last_update " << info.last_update;
-
- if (!pg_log.get_log().empty()) {
- // sloppy check
- if ((pg_log.get_log().log.begin()->version <= pg_log.get_tail()))
- osd->clog->error() << info.pgid
- << " log bound mismatch, info (tail,head] ("
- << pg_log.get_tail() << "," << pg_log.get_head() << "]"
- << " actual ["
- << pg_log.get_log().log.begin()->version << ","
- << pg_log.get_log().log.rbegin()->version << "]";
- }
-
- if (pg_log.get_log().caller_ops.size() > pg_log.get_log().log.size()) {
- osd->clog->error() << info.pgid
- << " caller_ops.size " << pg_log.get_log().caller_ops.size()
- << " > log size " << pg_log.get_log().log.size();
- }
-}
-
-void PG::update_snap_map(
- const vector<pg_log_entry_t> &log_entries,
- ObjectStore::Transaction &t)
-{
- for (vector<pg_log_entry_t>::const_iterator i = log_entries.begin();
- i != log_entries.end();
- ++i) {
- OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
- if (i->soid.snap < CEPH_MAXSNAP) {
- if (i->is_delete()) {
- int r = snap_mapper.remove_oid(
- i->soid,
- &_t);
- if (r != 0)
- derr << __func__ << " remove_oid " << i->soid << " failed with " << r << dendl;
- // On removal tolerate missing key corruption
- ceph_assert(r == 0 || r == -ENOENT);
- } else if (i->is_update()) {
- ceph_assert(i->snaps.length() > 0);
- vector<snapid_t> snaps;
- bufferlist snapbl = i->snaps;
- auto p = snapbl.cbegin();
- try {
- decode(snaps, p);
- } catch (...) {
- derr << __func__ << " decode snaps failure on " << *i << dendl;
- snaps.clear();
- }
- set<snapid_t> _snaps(snaps.begin(), snaps.end());
-
- if (i->is_clone() || i->is_promote()) {
- snap_mapper.add_oid(
- i->soid,
- _snaps,
- &_t);
- } else if (i->is_modify()) {
- int r = snap_mapper.update_snaps(
- i->soid,
- _snaps,
- 0,
- &_t);
- ceph_assert(r == 0);
- } else {
- ceph_assert(i->is_clean());
- }
- }
- }
- }
-}
-
-/**
- * filter trimming|trimmed snaps out of snapcontext
- */
-void PG::filter_snapc(vector<snapid_t> &snaps)
-{
- // nothing needs to trim, we can return immediately
- if (snap_trimq.empty() && info.purged_snaps.empty())
- return;
-
- bool filtering = false;
- vector<snapid_t> newsnaps;
- for (vector<snapid_t>::iterator p = snaps.begin();
- p != snaps.end();
- ++p) {
- if (snap_trimq.contains(*p) || info.purged_snaps.contains(*p)) {
- if (!filtering) {
- // start building a new vector with what we've seen so far
- dout(10) << "filter_snapc filtering " << snaps << dendl;
- newsnaps.insert(newsnaps.begin(), snaps.begin(), p);
- filtering = true;
- }
- dout(20) << "filter_snapc removing trimq|purged snap " << *p << dendl;
- } else {
- if (filtering)
- newsnaps.push_back(*p); // continue building new vector
- }
- }
- if (filtering) {
- snaps.swap(newsnaps);
- dout(10) << "filter_snapc result " << snaps << dendl;
- }
-}
-
-void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m)
-{
- for (map<hobject_t, list<OpRequestRef>>::iterator it = m.begin();
- it != m.end();
- ++it)
- requeue_ops(it->second);
- m.clear();
-}
-
-void PG::requeue_op(OpRequestRef op)
-{
- auto p = waiting_for_map.find(op->get_source());
- if (p != waiting_for_map.end()) {
- dout(20) << __func__ << " " << op << " (waiting_for_map " << p->first << ")"
- << dendl;
- p->second.push_front(op);
- } else {
- dout(20) << __func__ << " " << op << dendl;
- osd->enqueue_front(
- OpQueueItem(
- unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, op)),
- op->get_req()->get_cost(),
- op->get_req()->get_priority(),
- op->get_req()->get_recv_stamp(),
- op->get_req()->get_source().num(),
- get_osdmap_epoch()));
- }
-}
-
-void PG::requeue_ops(list<OpRequestRef> &ls)
-{
- for (list<OpRequestRef>::reverse_iterator i = ls.rbegin();
- i != ls.rend();
- ++i) {
- requeue_op(*i);
- }
- ls.clear();
-}
-
-void PG::requeue_map_waiters()
-{
- epoch_t epoch = get_osdmap_epoch();
- auto p = waiting_for_map.begin();
- while (p != waiting_for_map.end()) {
- if (epoch < p->second.front()->min_epoch) {
- dout(20) << __func__ << " " << p->first << " front op "
- << p->second.front() << " must still wait, doing nothing"
- << dendl;
- ++p;
- } else {
- dout(20) << __func__ << " " << p->first << " " << p->second << dendl;
- for (auto q = p->second.rbegin(); q != p->second.rend(); ++q) {
- auto req = *q;
- osd->enqueue_front(OpQueueItem(
- unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, req)),
- req->get_req()->get_cost(),
- req->get_req()->get_priority(),
- req->get_req()->get_recv_stamp(),
- req->get_req()->get_source().num(),
- epoch));
- }
- p = waiting_for_map.erase(p);
- }
- }
-}
-
-
-// ==========================================================================================
-// SCRUB
-
-/*
- * when holding pg and sched_scrub_lock, then the states are:
- * scheduling:
- * scrubber.reserved = true
- * scrub_rserved_peers includes whoami
- * osd->scrub_pending++
- * scheduling, replica declined:
- * scrubber.reserved = true
- * scrubber.reserved_peers includes -1
- * osd->scrub_pending++
- * pending:
- * scrubber.reserved = true
- * scrubber.reserved_peers.size() == acting.size();
- * pg on scrub_wq
- * osd->scrub_pending++
- * scrubbing:
- * scrubber.reserved = false;
- * scrubber.reserved_peers empty
- * osd->scrubber.active++
- */
-
-// returns true if a scrub has been newly kicked off
-bool PG::sched_scrub()
-{
- bool nodeep_scrub = false;
- ceph_assert(is_locked());
- if (!(is_primary() && is_active() && is_clean() && !is_scrubbing())) {
- return false;
- }
-
- double deep_scrub_interval = 0;
- pool.info.opts.get(pool_opts_t::DEEP_SCRUB_INTERVAL, &deep_scrub_interval);
- if (deep_scrub_interval <= 0) {
- deep_scrub_interval = cct->_conf->osd_deep_scrub_interval;
- }
- bool time_for_deep = ceph_clock_now() >=
- info.history.last_deep_scrub_stamp + deep_scrub_interval;
-
- bool deep_coin_flip = false;
- // Only add random deep scrubs when NOT user initiated scrub
- if (!scrubber.must_scrub)
- deep_coin_flip = (rand() % 100) < cct->_conf->osd_deep_scrub_randomize_ratio * 100;
- dout(20) << __func__ << ": time_for_deep=" << time_for_deep << " deep_coin_flip=" << deep_coin_flip << dendl;
-
- time_for_deep = (time_for_deep || deep_coin_flip);
-
- //NODEEP_SCRUB so ignore time initiated deep-scrub
- if (get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
- pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB)) {
- time_for_deep = false;
- nodeep_scrub = true;
- }
-
- if (!scrubber.must_scrub) {
- ceph_assert(!scrubber.must_deep_scrub);
-
- //NOSCRUB so skip regular scrubs
- if ((get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
- pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB)) && !time_for_deep) {
- if (scrubber.reserved) {
- // cancel scrub if it is still in scheduling,
- // so pgs from other pools where scrub are still legal
- // have a chance to go ahead with scrubbing.
- clear_scrub_reserved();
- scrub_unreserve_replicas();
- }
- return false;
- }
- }
-
- // Clear these in case user issues the scrub/repair command during
- // the scheduling of the scrub/repair (e.g. request reservation)
- scrubber.deep_scrub_on_error = false;
- scrubber.auto_repair = false;
- if (cct->_conf->osd_scrub_auto_repair
- && get_pgbackend()->auto_repair_supported()
- // respect the command from user, and not do auto-repair
- && !scrubber.must_repair
- && !scrubber.must_scrub
- && !scrubber.must_deep_scrub) {
- if (time_for_deep) {
- dout(20) << __func__ << ": auto repair with deep scrubbing" << dendl;
- scrubber.auto_repair = true;
- } else {
- dout(20) << __func__ << ": auto repair with scrubbing, rescrub if errors found" << dendl;
- scrubber.deep_scrub_on_error = true;
- }
- }
-
- bool ret = true;
- if (!scrubber.reserved) {
- ceph_assert(scrubber.reserved_peers.empty());
- if ((cct->_conf->osd_scrub_during_recovery || !osd->is_recovery_active()) &&
- osd->inc_scrubs_pending()) {
- dout(20) << __func__ << ": reserved locally, reserving replicas" << dendl;
- scrubber.reserved = true;
- scrubber.reserved_peers.insert(pg_whoami);
- scrub_reserve_replicas();
- } else {
- dout(20) << __func__ << ": failed to reserve locally" << dendl;
- ret = false;
- }
- }
- if (scrubber.reserved) {
- if (scrubber.reserve_failed) {
- dout(20) << "sched_scrub: failed, a peer declined" << dendl;
- clear_scrub_reserved();
- scrub_unreserve_replicas();
- ret = false;
- } else if (scrubber.reserved_peers.size() == acting.size()) {
- dout(20) << "sched_scrub: success, reserved self and replicas" << dendl;
- if (time_for_deep) {
- dout(10) << "sched_scrub: scrub will be deep" << dendl;
- state_set(PG_STATE_DEEP_SCRUB);
- } else if (!scrubber.must_deep_scrub && info.stats.stats.sum.num_deep_scrub_errors) {
- if (!nodeep_scrub) {
- osd->clog->info() << "osd." << osd->whoami
- << " pg " << info.pgid
- << " Deep scrub errors, upgrading scrub to deep-scrub";
- state_set(PG_STATE_DEEP_SCRUB);
- } else if (!scrubber.must_scrub) {
- osd->clog->error() << "osd." << osd->whoami
- << " pg " << info.pgid
- << " Regular scrub skipped due to deep-scrub errors and nodeep-scrub set";
- clear_scrub_reserved();
- scrub_unreserve_replicas();
- return false;
- } else {
- osd->clog->error() << "osd." << osd->whoami
- << " pg " << info.pgid
- << " Regular scrub request, deep-scrub details will be lost";
- }
- }
- queue_scrub();
- } else {
- // none declined, since scrubber.reserved is set
- dout(20) << "sched_scrub: reserved " << scrubber.reserved_peers << ", waiting for replicas" << dendl;
- }
- }
-
- return ret;
-}
-
-void PG::reg_next_scrub()
-{
- if (!is_primary())
- return;
-
- utime_t reg_stamp;
- bool must = false;
- if (scrubber.must_scrub) {
- // Set the smallest time that isn't utime_t()
- reg_stamp = utime_t(0,1);
- must = true;
- } else if (info.stats.stats_invalid && cct->_conf->osd_scrub_invalid_stats) {
- reg_stamp = ceph_clock_now();
- must = true;
- } else {
- reg_stamp = info.history.last_scrub_stamp;
- }
- // note down the sched_time, so we can locate this scrub, and remove it
- // later on.
- double scrub_min_interval = 0, scrub_max_interval = 0;
- pool.info.opts.get(pool_opts_t::SCRUB_MIN_INTERVAL, &scrub_min_interval);
- pool.info.opts.get(pool_opts_t::SCRUB_MAX_INTERVAL, &scrub_max_interval);
- ceph_assert(scrubber.scrub_reg_stamp == utime_t());
- scrubber.scrub_reg_stamp = osd->reg_pg_scrub(info.pgid,
- reg_stamp,
- scrub_min_interval,
- scrub_max_interval,
- must);
- dout(10) << __func__ << " pg " << pg_id << " register next scrub, scrub time "
- << scrubber.scrub_reg_stamp << ", must = " << (int)must << dendl;
-}
-
-void PG::unreg_next_scrub()
-{
- if (is_primary()) {
- osd->unreg_pg_scrub(info.pgid, scrubber.scrub_reg_stamp);
- scrubber.scrub_reg_stamp = utime_t();
- }
-}
-
-void PG::do_replica_scrub_map(OpRequestRef op)
-{
- const MOSDRepScrubMap *m = static_cast<const MOSDRepScrubMap*>(op->get_req());
- dout(7) << __func__ << " " << *m << dendl;
- if (m->map_epoch < info.history.same_interval_since) {
- dout(10) << __func__ << " discarding old from "
- << m->map_epoch << " < " << info.history.same_interval_since
- << dendl;
- return;
- }
- if (!scrubber.is_chunky_scrub_active()) {
- dout(10) << __func__ << " scrub isn't active" << dendl;
- return;
- }
-
- op->mark_started();
-
- auto p = const_cast<bufferlist&>(m->get_data()).cbegin();
- scrubber.received_maps[m->from].decode(p, info.pgid.pool());
- dout(10) << "map version is "
- << scrubber.received_maps[m->from].valid_through
- << dendl;
-
- dout(10) << __func__ << " waiting_on_whom was " << scrubber.waiting_on_whom
- << dendl;
- ceph_assert(scrubber.waiting_on_whom.count(m->from));
- scrubber.waiting_on_whom.erase(m->from);
- if (m->preempted) {
- dout(10) << __func__ << " replica was preempted, setting flag" << dendl;
- scrub_preempted = true;
- }
- if (scrubber.waiting_on_whom.empty()) {
- requeue_scrub(ops_blocked_by_scrub());
- }
-}
-
-// send scrub v3 messages (chunky scrub)
-void PG::_request_scrub_map(
- pg_shard_t replica, eversion_t version,
- hobject_t start, hobject_t end,
- bool deep,
- bool allow_preemption)
-{
- ceph_assert(replica != pg_whoami);
- dout(10) << "scrub requesting scrubmap from osd." << replica
- << " deep " << (int)deep << dendl;
- MOSDRepScrub *repscrubop = new MOSDRepScrub(
- spg_t(info.pgid.pgid, replica.shard), version,
- get_osdmap_epoch(),
- get_last_peering_reset(),
- start, end, deep,
- allow_preemption,
- scrubber.priority,
- ops_blocked_by_scrub());
- // default priority, we want the rep scrub processed prior to any recovery
- // or client io messages (we are holding a lock!)
- osd->send_message_osd_cluster(
- replica.osd, repscrubop, get_osdmap_epoch());
-}
-
-void PG::handle_scrub_reserve_request(OpRequestRef op)
-{
- dout(7) << __func__ << " " << *op->get_req() << dendl;
- op->mark_started();
- if (scrubber.reserved) {
- dout(10) << __func__ << " ignoring reserve request: Already reserved"
- << dendl;
- return;
- }
- if ((cct->_conf->osd_scrub_during_recovery || !osd->is_recovery_active()) &&
- osd->inc_scrubs_pending()) {
- scrubber.reserved = true;
- } else {
- dout(20) << __func__ << ": failed to reserve remotely" << dendl;
- scrubber.reserved = false;
- }
- const MOSDScrubReserve *m =
- static_cast<const MOSDScrubReserve*>(op->get_req());
- Message *reply = new MOSDScrubReserve(
- spg_t(info.pgid.pgid, primary.shard),
- m->map_epoch,
- scrubber.reserved ? MOSDScrubReserve::GRANT : MOSDScrubReserve::REJECT,
- pg_whoami);
- osd->send_message_osd_cluster(reply, op->get_req()->get_connection());
-}
-
-void PG::handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from)
-{
- dout(7) << __func__ << " " << *op->get_req() << dendl;
- op->mark_started();
- if (!scrubber.reserved) {
- dout(10) << "ignoring obsolete scrub reserve reply" << dendl;
- return;
- }
- if (scrubber.reserved_peers.find(from) != scrubber.reserved_peers.end()) {
- dout(10) << " already had osd." << from << " reserved" << dendl;
- } else {
- dout(10) << " osd." << from << " scrub reserve = success" << dendl;
- scrubber.reserved_peers.insert(from);
- sched_scrub();
- }
-}
-
-void PG::handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from)
-{
- dout(7) << __func__ << " " << *op->get_req() << dendl;
- op->mark_started();
- if (!scrubber.reserved) {
- dout(10) << "ignoring obsolete scrub reserve reply" << dendl;
- return;
- }
- if (scrubber.reserved_peers.find(from) != scrubber.reserved_peers.end()) {
- dout(10) << " already had osd." << from << " reserved" << dendl;
- } else {
- /* One decline stops this pg from being scheduled for scrubbing. */
- dout(10) << " osd." << from << " scrub reserve = fail" << dendl;
- scrubber.reserve_failed = true;
- sched_scrub();
- }
-}
-
-void PG::handle_scrub_reserve_release(OpRequestRef op)
-{
- dout(7) << __func__ << " " << *op->get_req() << dendl;
- op->mark_started();
- clear_scrub_reserved();
-}
-
-// We can zero the value of primary num_bytes as just an atomic.
-// However, setting above zero reserves space for backfill and requires
-// the OSDService::stat_lock which protects all OSD usage
-void PG::set_reserved_num_bytes(int64_t primary, int64_t local) {
- ceph_assert(osd->stat_lock.is_locked_by_me());
- primary_num_bytes.store(primary);
- local_num_bytes.store(local);
- return;
-}
-
-void PG::clear_reserved_num_bytes() {
- primary_num_bytes.store(0);
- local_num_bytes.store(0);
- return;
-}
-
-void PG::reject_reservation()
-{
- clear_reserved_num_bytes();
- osd->send_message_osd_cluster(
- primary.osd,
- new MBackfillReserve(
- MBackfillReserve::REJECT,
- spg_t(info.pgid.pgid, primary.shard),
- get_osdmap_epoch()),
- get_osdmap_epoch());
-}
-
-void PG::schedule_backfill_retry(float delay)
-{
- std::lock_guard lock(osd->recovery_request_lock);
- osd->recovery_request_timer.add_event_after(
- delay,
- new QueuePeeringEvt<RequestBackfill>(
- this, get_osdmap_epoch(),
- RequestBackfill()));
-}
-
-void PG::schedule_recovery_retry(float delay)
-{
- std::lock_guard lock(osd->recovery_request_lock);
- osd->recovery_request_timer.add_event_after(
- delay,
- new QueuePeeringEvt<DoRecovery>(
- this, get_osdmap_epoch(),
- DoRecovery()));
-}
-
-void PG::clear_scrub_reserved()
-{
- scrubber.reserved_peers.clear();
- scrubber.reserve_failed = false;
-
- if (scrubber.reserved) {
- scrubber.reserved = false;
- osd->dec_scrubs_pending();
- }
-}
-
-void PG::scrub_reserve_replicas()
-{
- ceph_assert(backfill_targets.empty());
- for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
- i != acting_recovery_backfill.end();
- ++i) {
- if (*i == pg_whoami) continue;
- dout(10) << "scrub requesting reserve from osd." << *i << dendl;
- osd->send_message_osd_cluster(
- i->osd,
- new MOSDScrubReserve(spg_t(info.pgid.pgid, i->shard),
- get_osdmap_epoch(),
- MOSDScrubReserve::REQUEST, pg_whoami),
- get_osdmap_epoch());
- }
-}
-
-void PG::scrub_unreserve_replicas()
-{
- ceph_assert(backfill_targets.empty());
- for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
- i != acting_recovery_backfill.end();
- ++i) {
- if (*i == pg_whoami) continue;
- dout(10) << "scrub requesting unreserve from osd." << *i << dendl;
- osd->send_message_osd_cluster(
- i->osd,
- new MOSDScrubReserve(spg_t(info.pgid.pgid, i->shard),
- get_osdmap_epoch(),
- MOSDScrubReserve::RELEASE, pg_whoami),
- get_osdmap_epoch());
- }
-}
-
-void PG::_scan_rollback_obs(const vector<ghobject_t> &rollback_obs)
-{
- ObjectStore::Transaction t;
- eversion_t trimmed_to = last_rollback_info_trimmed_to_applied;
- for (vector<ghobject_t>::const_iterator i = rollback_obs.begin();
- i != rollback_obs.end();
- ++i) {
- if (i->generation < trimmed_to.version) {
- osd->clog->error() << "osd." << osd->whoami
- << " pg " << info.pgid
- << " found obsolete rollback obj "
- << *i << " generation < trimmed_to "
- << trimmed_to
- << "...repaired";
- t.remove(coll, *i);
- }
- }
- if (!t.empty()) {
- derr << __func__ << ": queueing trans to clean up obsolete rollback objs"
- << dendl;
- osd->store->queue_transaction(ch, std::move(t), NULL);
- }
-}
-
-void PG::_scan_snaps(ScrubMap &smap)
-{
- hobject_t head;
- SnapSet snapset;
-
- // Test qa/standalone/scrub/osd-scrub-snaps.sh uses this message to verify
- // caller using clean_meta_map(), and it works properly.
- dout(20) << __func__ << " start" << dendl;
-
- for (map<hobject_t, ScrubMap::object>::reverse_iterator i = smap.objects.rbegin();
- i != smap.objects.rend();
- ++i) {
- const hobject_t &hoid = i->first;
- ScrubMap::object &o = i->second;
-
- dout(20) << __func__ << " " << hoid << dendl;
-
- ceph_assert(!hoid.is_snapdir());
- if (hoid.is_head()) {
- // parse the SnapSet
- bufferlist bl;
- if (o.attrs.find(SS_ATTR) == o.attrs.end()) {
- continue;
- }
- bl.push_back(o.attrs[SS_ATTR]);
- auto p = bl.cbegin();
- try {
- decode(snapset, p);
- } catch(...) {
- continue;
- }
- head = hoid.get_head();
- continue;
- }
- if (hoid.snap < CEPH_MAXSNAP) {
- // check and if necessary fix snap_mapper
- if (hoid.get_head() != head) {
- derr << __func__ << " no head for " << hoid << " (have " << head << ")"
- << dendl;
- continue;
- }
- set<snapid_t> obj_snaps;
- auto p = snapset.clone_snaps.find(hoid.snap);
- if (p == snapset.clone_snaps.end()) {
- derr << __func__ << " no clone_snaps for " << hoid << " in " << snapset
- << dendl;
- continue;
- }
- obj_snaps.insert(p->second.begin(), p->second.end());
- set<snapid_t> cur_snaps;
- int r = snap_mapper.get_snaps(hoid, &cur_snaps);
- if (r != 0 && r != -ENOENT) {
- derr << __func__ << ": get_snaps returned " << cpp_strerror(r) << dendl;
- ceph_abort();
- }
- if (r == -ENOENT || cur_snaps != obj_snaps) {
- ObjectStore::Transaction t;
- OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
- if (r == 0) {
- r = snap_mapper.remove_oid(hoid, &_t);
- if (r != 0) {
- derr << __func__ << ": remove_oid returned " << cpp_strerror(r)
- << dendl;
- ceph_abort();
- }
- osd->clog->error() << "osd." << osd->whoami
- << " found snap mapper error on pg "
- << info.pgid
- << " oid " << hoid << " snaps in mapper: "
- << cur_snaps << ", oi: "
- << obj_snaps
- << "...repaired";
- } else {
- osd->clog->error() << "osd." << osd->whoami
- << " found snap mapper error on pg "
- << info.pgid
- << " oid " << hoid << " snaps missing in mapper"
- << ", should be: "
- << obj_snaps
- << " was " << cur_snaps << " r " << r
- << "...repaired";
- }
- snap_mapper.add_oid(hoid, obj_snaps, &_t);
-
- // wait for repair to apply to avoid confusing other bits of the system.
- {
- Cond my_cond;
- Mutex my_lock("PG::_scan_snaps my_lock");
- int r = 0;
- bool done;
- t.register_on_applied_sync(
- new C_SafeCond(&my_lock, &my_cond, &done, &r));
- r = osd->store->queue_transaction(ch, std::move(t));
- if (r != 0) {
- derr << __func__ << ": queue_transaction got " << cpp_strerror(r)
- << dendl;
- } else {
- my_lock.Lock();
- while (!done)
- my_cond.Wait(my_lock);
- my_lock.Unlock();
- }
- }
- }
- }
- }
-}
-
-void PG::_repair_oinfo_oid(ScrubMap &smap)
-{
- for (map<hobject_t, ScrubMap::object>::reverse_iterator i = smap.objects.rbegin();
- i != smap.objects.rend();
- ++i) {
- const hobject_t &hoid = i->first;
- ScrubMap::object &o = i->second;
-
- bufferlist bl;
- if (o.attrs.find(OI_ATTR) == o.attrs.end()) {
- continue;
- }
- bl.push_back(o.attrs[OI_ATTR]);
- object_info_t oi;
- try {
- oi.decode(bl);
- } catch(...) {
- continue;
- }
- if (oi.soid != hoid) {
- ObjectStore::Transaction t;
- OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
- osd->clog->error() << "osd." << osd->whoami
- << " found object info error on pg "
- << info.pgid
- << " oid " << hoid << " oid in object info: "
- << oi.soid
- << "...repaired";
- // Fix object info
- oi.soid = hoid;
- bl.clear();
- encode(oi, bl, get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
-
- bufferptr bp(bl.c_str(), bl.length());
- o.attrs[OI_ATTR] = bp;
-
- t.setattr(coll, ghobject_t(hoid), OI_ATTR, bl);
- int r = osd->store->queue_transaction(ch, std::move(t));
- if (r != 0) {
- derr << __func__ << ": queue_transaction got " << cpp_strerror(r)
- << dendl;
- }
- }
- }
-}
-int PG::build_scrub_map_chunk(
- ScrubMap &map,
- ScrubMapBuilder &pos,
- hobject_t start,
- hobject_t end,
- bool deep,
- ThreadPool::TPHandle &handle)
-{
- dout(10) << __func__ << " [" << start << "," << end << ") "
- << " pos " << pos
- << dendl;
-
- // start
- while (pos.empty()) {
- pos.deep = deep;
- map.valid_through = info.last_update;
-
- // objects
- vector<ghobject_t> rollback_obs;
- pos.ret = get_pgbackend()->objects_list_range(
- start,
- end,
- &pos.ls,
- &rollback_obs);
- if (pos.ret < 0) {
- dout(5) << "objects_list_range error: " << pos.ret << dendl;
- return pos.ret;
- }
- if (pos.ls.empty()) {
- break;
- }
- _scan_rollback_obs(rollback_obs);
- pos.pos = 0;
- return -EINPROGRESS;
- }
-
- // scan objects
- while (!pos.done()) {
- int r = get_pgbackend()->be_scan_list(map, pos);
- if (r == -EINPROGRESS) {
- return r;
- }
- }
-
- // finish
- dout(20) << __func__ << " finishing" << dendl;
- ceph_assert(pos.done());
- _repair_oinfo_oid(map);
- if (!is_primary()) {
- ScrubMap for_meta_scrub;
- // In case we restarted smaller chunk, clear old data
- scrubber.cleaned_meta_map.clear_from(scrubber.start);
- scrubber.cleaned_meta_map.insert(map);
- scrubber.clean_meta_map(for_meta_scrub);
- _scan_snaps(for_meta_scrub);
- }
-
- dout(20) << __func__ << " done, got " << map.objects.size() << " items"
- << dendl;
- return 0;
-}
-
-void PG::Scrubber::cleanup_store(ObjectStore::Transaction *t) {
- if (!store)
- return;
- struct OnComplete : Context {
- std::unique_ptr<Scrub::Store> store;
- explicit OnComplete(
- std::unique_ptr<Scrub::Store> &&store)
- : store(std::move(store)) {}
- void finish(int) override {}
- };
- store->cleanup(t);
- t->register_on_complete(new OnComplete(std::move(store)));
- ceph_assert(!store);
-}
-
-void PG::repair_object(
- const hobject_t& soid, list<pair<ScrubMap::object, pg_shard_t> > *ok_peers,
- pg_shard_t bad_peer)
-{
- list<pg_shard_t> op_shards;
- for (auto i : *ok_peers) {
- op_shards.push_back(i.second);
- }
- dout(10) << "repair_object " << soid << " bad_peer osd."
- << bad_peer << " ok_peers osd.{" << op_shards << "}" << dendl;
- ScrubMap::object &po = ok_peers->back().first;
- eversion_t v;
- bufferlist bv;
- bv.push_back(po.attrs[OI_ATTR]);
- object_info_t oi;
- try {
- auto bliter = bv.cbegin();
- decode(oi, bliter);
- } catch (...) {
- dout(0) << __func__ << ": Need version of replica, bad object_info_t: " << soid << dendl;
- ceph_abort();
- }
- if (bad_peer != primary) {
- peer_missing[bad_peer].add(soid, oi.version, eversion_t(), false);
- } else {
- // We should only be scrubbing if the PG is clean.
- ceph_assert(waiting_for_unreadable_object.empty());
-
- pg_log.missing_add(soid, oi.version, eversion_t());
-
- pg_log.set_last_requested(0);
- dout(10) << __func__ << ": primary = " << primary << dendl;
- }
-
- if (is_ec_pg() || bad_peer == primary) {
- // we'd better collect all shard for EC pg, and prepare good peers as the
- // source of pull in the case of replicated pg.
- missing_loc.add_missing(soid, oi.version, eversion_t());
- list<pair<ScrubMap::object, pg_shard_t> >::iterator i;
- for (i = ok_peers->begin();
- i != ok_peers->end();
- ++i)
- missing_loc.add_location(soid, i->second);
- }
-}
-
-/* replica_scrub
- *
- * Wait for last_update_applied to match msg->scrub_to as above. Wait
- * for pushes to complete in case of recent recovery. Build a single
- * scrubmap of objects that are in the range [msg->start, msg->end).
- */
-void PG::replica_scrub(
- OpRequestRef op,
- ThreadPool::TPHandle &handle)
-{
- const MOSDRepScrub *msg = static_cast<const MOSDRepScrub *>(op->get_req());
- ceph_assert(!scrubber.active_rep_scrub);
- dout(7) << "replica_scrub" << dendl;
-
- if (msg->map_epoch < info.history.same_interval_since) {
- dout(10) << "replica_scrub discarding old replica_scrub from "
- << msg->map_epoch << " < " << info.history.same_interval_since
- << dendl;
- return;
- }
-
- ceph_assert(msg->chunky);
- if (active_pushes > 0) {
- dout(10) << "waiting for active pushes to finish" << dendl;
- scrubber.active_rep_scrub = op;
- return;
- }
-
- scrubber.state = Scrubber::BUILD_MAP_REPLICA;
- scrubber.replica_scrub_start = msg->min_epoch;
- scrubber.start = msg->start;
- scrubber.end = msg->end;
- scrubber.max_end = msg->end;
- scrubber.deep = msg->deep;
- scrubber.epoch_start = info.history.same_interval_since;
- if (msg->priority) {
- scrubber.priority = msg->priority;
- } else {
- scrubber.priority = get_scrub_priority();
- }
-
- scrub_can_preempt = msg->allow_preemption;
- scrub_preempted = false;
- scrubber.replica_scrubmap_pos.reset();
-
- requeue_scrub(msg->high_priority);
-}
-
-/* Scrub:
- * PG_STATE_SCRUBBING is set when the scrub is queued
- *
- * scrub will be chunky if all OSDs in PG support chunky scrub
- * scrub will fail if OSDs are too old.
- */
-void PG::scrub(epoch_t queued, ThreadPool::TPHandle &handle)
-{
- if (cct->_conf->osd_scrub_sleep > 0 &&
- (scrubber.state == PG::Scrubber::NEW_CHUNK ||
- scrubber.state == PG::Scrubber::INACTIVE) &&
- scrubber.needs_sleep) {
- ceph_assert(!scrubber.sleeping);
- dout(20) << __func__ << " state is INACTIVE|NEW_CHUNK, sleeping" << dendl;
-
- // Do an async sleep so we don't block the op queue
- OSDService *osds = osd;
- spg_t pgid = get_pgid();
- int state = scrubber.state;
- auto scrub_requeue_callback =
- new FunctionContext([osds, pgid, state](int r) {
- PGRef pg = osds->osd->lookup_lock_pg(pgid);
- if (pg == nullptr) {
- lgeneric_dout(osds->osd->cct, 20)
- << "scrub_requeue_callback: Could not find "
- << "PG " << pgid << " can't complete scrub requeue after sleep"
- << dendl;
- return;
- }
- pg->scrubber.sleeping = false;
- pg->scrubber.needs_sleep = false;
- lgeneric_dout(pg->cct, 20)
- << "scrub_requeue_callback: slept for "
- << ceph_clock_now() - pg->scrubber.sleep_start
- << ", re-queuing scrub with state " << state << dendl;
- pg->scrub_queued = false;
- pg->requeue_scrub();
- pg->scrubber.sleep_start = utime_t();
- pg->unlock();
- });
- std::lock_guard l(osd->sleep_lock);
- osd->sleep_timer.add_event_after(cct->_conf->osd_scrub_sleep,
- scrub_requeue_callback);
- scrubber.sleeping = true;
- scrubber.sleep_start = ceph_clock_now();
- return;
- }
- if (pg_has_reset_since(queued)) {
- return;
- }
- ceph_assert(scrub_queued);
- scrub_queued = false;
- scrubber.needs_sleep = true;
-
- // for the replica
- if (!is_primary() &&
- scrubber.state == PG::Scrubber::BUILD_MAP_REPLICA) {
- chunky_scrub(handle);
- return;
- }
-
- if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) {
- dout(10) << "scrub -- not primary or active or not clean" << dendl;
- state_clear(PG_STATE_SCRUBBING);
- state_clear(PG_STATE_REPAIR);
- state_clear(PG_STATE_DEEP_SCRUB);
- publish_stats_to_osd();
- return;
- }
-
- if (!scrubber.active) {
- ceph_assert(backfill_targets.empty());
-
- scrubber.deep = state_test(PG_STATE_DEEP_SCRUB);
-
- dout(10) << "starting a new chunky scrub" << dendl;
- }
-
- chunky_scrub(handle);
-}
-
-/*
- * Chunky scrub scrubs objects one chunk at a time with writes blocked for that
- * chunk.
- *
- * The object store is partitioned into chunks which end on hash boundaries. For
- * each chunk, the following logic is performed:
- *
- * (1) Block writes on the chunk
- * (2) Request maps from replicas
- * (3) Wait for pushes to be applied (after recovery)
- * (4) Wait for writes to flush on the chunk
- * (5) Wait for maps from replicas
- * (6) Compare / repair all scrub maps
- * (7) Wait for digest updates to apply
- *
- * This logic is encoded in the mostly linear state machine:
- *
- * +------------------+
- * _________v__________ |
- * | | |
- * | INACTIVE | |
- * |____________________| |
- * | |
- * | +----------+ |
- * _________v___v______ | |
- * | | | |
- * | NEW_CHUNK | | |
- * |____________________| | |
- * | | |
- * _________v__________ | |
- * | | | |
- * | WAIT_PUSHES | | |
- * |____________________| | |
- * | | |
- * _________v__________ | |
- * | | | |
- * | WAIT_LAST_UPDATE | | |
- * |____________________| | |
- * | | |
- * _________v__________ | |
- * | | | |
- * | BUILD_MAP | | |
- * |____________________| | |
- * | | |
- * _________v__________ | |
- * | | | |
- * | WAIT_REPLICAS | | |
- * |____________________| | |
- * | | |
- * _________v__________ | |
- * | | | |
- * | COMPARE_MAPS | | |
- * |____________________| | |
- * | | |
- * | | |
- * _________v__________ | |
- * | | | |
- * |WAIT_DIGEST_UPDATES | | |
- * |____________________| | |
- * | | | |
- * | +----------+ |
- * _________v__________ |
- * | | |
- * | FINISH | |
- * |____________________| |
- * | |
- * +------------------+
- *
- * The primary determines the last update from the subset by walking the log. If
- * it sees a log entry pertaining to a file in the chunk, it tells the replicas
- * to wait until that update is applied before building a scrub map. Both the
- * primary and replicas will wait for any active pushes to be applied.
- *
- * In contrast to classic_scrub, chunky_scrub is entirely handled by scrub_wq.
- *
- * scrubber.state encodes the current state of the scrub (refer to state diagram
- * for details).
- */
-void PG::chunky_scrub(ThreadPool::TPHandle &handle)
-{
- // check for map changes
- if (scrubber.is_chunky_scrub_active()) {
- if (scrubber.epoch_start != info.history.same_interval_since) {
- dout(10) << "scrub pg changed, aborting" << dendl;
- scrub_clear_state();
- scrub_unreserve_replicas();
- return;
- }
- }
-
- bool done = false;
- int ret;
-
- while (!done) {
- dout(20) << "scrub state " << Scrubber::state_string(scrubber.state)
- << " [" << scrubber.start << "," << scrubber.end << ")"
- << " max_end " << scrubber.max_end << dendl;
-
- switch (scrubber.state) {
- case PG::Scrubber::INACTIVE:
- dout(10) << "scrub start" << dendl;
- ceph_assert(is_primary());
-
- publish_stats_to_osd();
- scrubber.epoch_start = info.history.same_interval_since;
- scrubber.active = true;
-
- osd->inc_scrubs_active(scrubber.reserved);
- if (scrubber.reserved) {
- scrubber.reserved = false;
- scrubber.reserved_peers.clear();
- }
-
- {
- ObjectStore::Transaction t;
- scrubber.cleanup_store(&t);
- scrubber.store.reset(Scrub::Store::create(osd->store, &t,
- info.pgid, coll));
- osd->store->queue_transaction(ch, std::move(t), nullptr);
- }
-
- // Don't include temporary objects when scrubbing
- scrubber.start = info.pgid.pgid.get_hobj_start();
- scrubber.state = PG::Scrubber::NEW_CHUNK;
-
- {
- bool repair = state_test(PG_STATE_REPAIR);
- bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
- const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
- stringstream oss;
- oss << info.pgid.pgid << " " << mode << " starts" << std::endl;
- osd->clog->debug(oss);
- }
-
- scrubber.preempt_left = cct->_conf.get_val<uint64_t>(
- "osd_scrub_max_preemptions");
- scrubber.preempt_divisor = 1;
- break;
-
- case PG::Scrubber::NEW_CHUNK:
- scrubber.primary_scrubmap = ScrubMap();
- scrubber.received_maps.clear();
-
- // begin (possible) preemption window
- if (scrub_preempted) {
- scrubber.preempt_left--;
- scrubber.preempt_divisor *= 2;
- dout(10) << __func__ << " preempted, " << scrubber.preempt_left
- << " left" << dendl;
- scrub_preempted = false;
- }
- scrub_can_preempt = scrubber.preempt_left > 0;
-
- {
- /* get the start and end of our scrub chunk
- *
- * Our scrub chunk has an important restriction we're going to need to
- * respect. We can't let head be start or end.
- * Using a half-open interval means that if end == head,
- * we'd scrub/lock head and the clone right next to head in different
- * chunks which would allow us to miss clones created between
- * scrubbing that chunk and scrubbing the chunk including head.
- * This isn't true for any of the other clones since clones can
- * only be created "just to the left of" head. There is one exception
- * to this: promotion of clones which always happens to the left of the
- * left-most clone, but promote_object checks the scrubber in that
- * case, so it should be ok. Also, it's ok to "miss" clones at the
- * left end of the range if we are a tier because they may legitimately
- * not exist (see _scrub).
- */
- int min = std::max<int64_t>(3, cct->_conf->osd_scrub_chunk_min /
- scrubber.preempt_divisor);
- int max = std::max<int64_t>(min, cct->_conf->osd_scrub_chunk_max /
- scrubber.preempt_divisor);
- hobject_t start = scrubber.start;
- hobject_t candidate_end;
- vector<hobject_t> objects;
- ret = get_pgbackend()->objects_list_partial(
- start,
- min,
- max,
- &objects,
- &candidate_end);
- ceph_assert(ret >= 0);
-
- if (!objects.empty()) {
- hobject_t back = objects.back();
- while (candidate_end.is_head() &&
- candidate_end == back.get_head()) {
- candidate_end = back;
- objects.pop_back();
- if (objects.empty()) {
- ceph_assert(0 ==
- "Somehow we got more than 2 objects which"
- "have the same head but are not clones");
- }
- back = objects.back();
- }
- if (candidate_end.is_head()) {
- ceph_assert(candidate_end != back.get_head());
- candidate_end = candidate_end.get_object_boundary();
- }
- } else {
- ceph_assert(candidate_end.is_max());
- }
-
- if (!_range_available_for_scrub(scrubber.start, candidate_end)) {
- // we'll be requeued by whatever made us unavailable for scrub
- dout(10) << __func__ << ": scrub blocked somewhere in range "
- << "[" << scrubber.start << ", " << candidate_end << ")"
- << dendl;
- done = true;
- break;
- }
- scrubber.end = candidate_end;
- if (scrubber.end > scrubber.max_end)
- scrubber.max_end = scrubber.end;
- }
-
- // walk the log to find the latest update that affects our chunk
- scrubber.subset_last_update = eversion_t();
- for (auto p = projected_log.log.rbegin();
- p != projected_log.log.rend();
- ++p) {
- if (p->soid >= scrubber.start &&
- p->soid < scrubber.end) {
- scrubber.subset_last_update = p->version;
- break;
- }
- }
- if (scrubber.subset_last_update == eversion_t()) {
- for (list<pg_log_entry_t>::const_reverse_iterator p =
- pg_log.get_log().log.rbegin();
- p != pg_log.get_log().log.rend();
- ++p) {
- if (p->soid >= scrubber.start &&
- p->soid < scrubber.end) {
- scrubber.subset_last_update = p->version;
- break;
- }
- }
- }
-
- scrubber.state = PG::Scrubber::WAIT_PUSHES;
- break;
-
- case PG::Scrubber::WAIT_PUSHES:
- if (active_pushes == 0) {
- scrubber.state = PG::Scrubber::WAIT_LAST_UPDATE;
- } else {
- dout(15) << "wait for pushes to apply" << dendl;
- done = true;
- }
- break;
-
- case PG::Scrubber::WAIT_LAST_UPDATE:
- if (last_update_applied < scrubber.subset_last_update) {
- // will be requeued by op_applied
- dout(15) << "wait for EC read/modify/writes to queue" << dendl;
- done = true;
- break;
- }
-
- // ask replicas to scan
- scrubber.waiting_on_whom.insert(pg_whoami);
-
- // request maps from replicas
- for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
- i != acting_recovery_backfill.end();
- ++i) {
- if (*i == pg_whoami) continue;
- _request_scrub_map(*i, scrubber.subset_last_update,
- scrubber.start, scrubber.end, scrubber.deep,
- scrubber.preempt_left > 0);
- scrubber.waiting_on_whom.insert(*i);
- }
- dout(10) << __func__ << " waiting_on_whom " << scrubber.waiting_on_whom
- << dendl;
-
- scrubber.state = PG::Scrubber::BUILD_MAP;
- scrubber.primary_scrubmap_pos.reset();
- break;
-
- case PG::Scrubber::BUILD_MAP:
- ceph_assert(last_update_applied >= scrubber.subset_last_update);
-
- // build my own scrub map
- if (scrub_preempted) {
- dout(10) << __func__ << " preempted" << dendl;
- scrubber.state = PG::Scrubber::BUILD_MAP_DONE;
- break;
- }
- ret = build_scrub_map_chunk(
- scrubber.primary_scrubmap,
- scrubber.primary_scrubmap_pos,
- scrubber.start, scrubber.end,
- scrubber.deep,
- handle);
- if (ret == -EINPROGRESS) {
- requeue_scrub();
- done = true;
- break;
- }
- scrubber.state = PG::Scrubber::BUILD_MAP_DONE;
- break;
-
- case PG::Scrubber::BUILD_MAP_DONE:
- if (scrubber.primary_scrubmap_pos.ret < 0) {
- dout(5) << "error: " << scrubber.primary_scrubmap_pos.ret
- << ", aborting" << dendl;
- scrub_clear_state();
- scrub_unreserve_replicas();
- return;
- }
- dout(10) << __func__ << " waiting_on_whom was "
- << scrubber.waiting_on_whom << dendl;
- ceph_assert(scrubber.waiting_on_whom.count(pg_whoami));
- scrubber.waiting_on_whom.erase(pg_whoami);
-
- scrubber.state = PG::Scrubber::WAIT_REPLICAS;
- break;
-
- case PG::Scrubber::WAIT_REPLICAS:
- if (!scrubber.waiting_on_whom.empty()) {
- // will be requeued by sub_op_scrub_map
- dout(10) << "wait for replicas to build scrub map" << dendl;
- done = true;
- break;
- }
- // end (possible) preemption window
- scrub_can_preempt = false;
- if (scrub_preempted) {
- dout(10) << __func__ << " preempted, restarting chunk" << dendl;
- scrubber.state = PG::Scrubber::NEW_CHUNK;
- } else {
- scrubber.state = PG::Scrubber::COMPARE_MAPS;
- }
- break;
-
- case PG::Scrubber::COMPARE_MAPS:
- ceph_assert(last_update_applied >= scrubber.subset_last_update);
- ceph_assert(scrubber.waiting_on_whom.empty());
-
- scrub_compare_maps();
- scrubber.start = scrubber.end;
- scrubber.run_callbacks();
-
- // requeue the writes from the chunk that just finished
- requeue_ops(waiting_for_scrub);
-
- scrubber.state = PG::Scrubber::WAIT_DIGEST_UPDATES;
-
- // fall-thru
-
- case PG::Scrubber::WAIT_DIGEST_UPDATES:
- if (scrubber.num_digest_updates_pending) {
- dout(10) << __func__ << " waiting on "
- << scrubber.num_digest_updates_pending
- << " digest updates" << dendl;
- done = true;
- break;
- }
-
- scrubber.preempt_left = cct->_conf.get_val<uint64_t>(
- "osd_scrub_max_preemptions");
- scrubber.preempt_divisor = 1;
-
- if (!(scrubber.end.is_max())) {
- scrubber.state = PG::Scrubber::NEW_CHUNK;
- requeue_scrub();
- done = true;
- } else {
- scrubber.state = PG::Scrubber::FINISH;
- }
-
- break;
-
- case PG::Scrubber::FINISH:
- scrub_finish();
- scrubber.state = PG::Scrubber::INACTIVE;
- done = true;
-
- if (!snap_trimq.empty()) {
- dout(10) << "scrub finished, requeuing snap_trimmer" << dendl;
- snap_trimmer_scrub_complete();
- }
-
- break;
-
- case PG::Scrubber::BUILD_MAP_REPLICA:
- // build my own scrub map
- if (scrub_preempted) {
- dout(10) << __func__ << " preempted" << dendl;
- ret = 0;
- } else {
- ret = build_scrub_map_chunk(
- scrubber.replica_scrubmap,
- scrubber.replica_scrubmap_pos,
- scrubber.start, scrubber.end,
- scrubber.deep,
- handle);
- }
- if (ret == -EINPROGRESS) {
- requeue_scrub();
- done = true;
- break;
- }
- // reply
- {
- MOSDRepScrubMap *reply = new MOSDRepScrubMap(
- spg_t(info.pgid.pgid, get_primary().shard),
- scrubber.replica_scrub_start,
- pg_whoami);
- reply->preempted = scrub_preempted;
- ::encode(scrubber.replica_scrubmap, reply->get_data());
- osd->send_message_osd_cluster(
- get_primary().osd, reply,
- scrubber.replica_scrub_start);
- }
- scrub_preempted = false;
- scrub_can_preempt = false;
- scrubber.state = PG::Scrubber::INACTIVE;
- scrubber.replica_scrubmap = ScrubMap();
- scrubber.replica_scrubmap_pos = ScrubMapBuilder();
- scrubber.start = hobject_t();
- scrubber.end = hobject_t();
- scrubber.max_end = hobject_t();
- done = true;
- break;
-
- default:
- ceph_abort();
- }
- }
- dout(20) << "scrub final state " << Scrubber::state_string(scrubber.state)
- << " [" << scrubber.start << "," << scrubber.end << ")"
- << " max_end " << scrubber.max_end << dendl;
-}
-
-bool PG::write_blocked_by_scrub(const hobject_t& soid)
-{
- if (soid < scrubber.start || soid >= scrubber.end) {
- return false;
- }
- if (scrub_can_preempt) {
- if (!scrub_preempted) {
- dout(10) << __func__ << " " << soid << " preempted" << dendl;
- scrub_preempted = true;
- } else {
- dout(10) << __func__ << " " << soid << " already preempted" << dendl;
- }
- return false;
- }
- return true;
-}
-
-bool PG::range_intersects_scrub(const hobject_t &start, const hobject_t& end)
-{
- // does [start, end] intersect [scrubber.start, scrubber.max_end)
- return (start < scrubber.max_end &&
- end >= scrubber.start);
-}
-
-void PG::scrub_clear_state(bool has_error)
-{
- ceph_assert(is_locked());
- state_clear(PG_STATE_SCRUBBING);
- if (!has_error)
- state_clear(PG_STATE_REPAIR);
- state_clear(PG_STATE_DEEP_SCRUB);
- publish_stats_to_osd();
-
- // active -> nothing.
- if (scrubber.active)
- osd->dec_scrubs_active();
-
- requeue_ops(waiting_for_scrub);
-
- scrubber.reset();
-
- // type-specific state clear
- _scrub_clear_state();
-}
-
-void PG::scrub_compare_maps()
-{
- dout(10) << __func__ << " has maps, analyzing" << dendl;
-
- // construct authoritative scrub map for type specific scrubbing
- scrubber.cleaned_meta_map.insert(scrubber.primary_scrubmap);
- map<hobject_t,
- pair<boost::optional<uint32_t>,
- boost::optional<uint32_t>>> missing_digest;
-
- map<pg_shard_t, ScrubMap *> maps;
- maps[pg_whoami] = &scrubber.primary_scrubmap;
-
- for (const auto& i : acting_recovery_backfill) {
- if (i == pg_whoami) continue;
- dout(2) << __func__ << " replica " << i << " has "
- << scrubber.received_maps[i].objects.size()
- << " items" << dendl;
- maps[i] = &scrubber.received_maps[i];
- }
-
- set<hobject_t> master_set;
-
- // Construct master set
- for (const auto map : maps) {
- for (const auto i : map.second->objects) {
- master_set.insert(i.first);
- }
- }
-
- stringstream ss;
- get_pgbackend()->be_omap_checks(maps, master_set,
- scrubber.omap_stats, ss);
-
- if (!ss.str().empty()) {
- osd->clog->warn(ss);
- }
-
- if (acting.size() > 1) {
- dout(10) << __func__ << " comparing replica scrub maps" << dendl;
-
- // Map from object with errors to good peer
- map<hobject_t, list<pg_shard_t>> authoritative;
-
- dout(2) << __func__ << " osd." << acting[0] << " has "
- << scrubber.primary_scrubmap.objects.size() << " items" << dendl;
-
- ss.str("");
- ss.clear();
-
- get_pgbackend()->be_compare_scrubmaps(
- maps,
- master_set,
- state_test(PG_STATE_REPAIR),
- scrubber.missing,
- scrubber.inconsistent,
- authoritative,
- missing_digest,
- scrubber.shallow_errors,
- scrubber.deep_errors,
- scrubber.store.get(),
- info.pgid, acting,
- ss);
- dout(2) << ss.str() << dendl;
-
- if (!ss.str().empty()) {
- osd->clog->error(ss);
- }
-
- for (map<hobject_t, list<pg_shard_t>>::iterator i = authoritative.begin();
- i != authoritative.end();
- ++i) {
- list<pair<ScrubMap::object, pg_shard_t> > good_peers;
- for (list<pg_shard_t>::const_iterator j = i->second.begin();
- j != i->second.end();
- ++j) {
- good_peers.push_back(make_pair(maps[*j]->objects[i->first], *j));
- }
- scrubber.authoritative.insert(
- make_pair(
- i->first,
- good_peers));
- }
-
- for (map<hobject_t, list<pg_shard_t>>::iterator i = authoritative.begin();
- i != authoritative.end();
- ++i) {
- scrubber.cleaned_meta_map.objects.erase(i->first);
- scrubber.cleaned_meta_map.objects.insert(
- *(maps[i->second.back()]->objects.find(i->first))
- );
- }
- }
-
- ScrubMap for_meta_scrub;
- scrubber.clean_meta_map(for_meta_scrub);
-
- // ok, do the pg-type specific scrubbing
- scrub_snapshot_metadata(for_meta_scrub, missing_digest);
- // Called here on the primary can use an authoritative map if it isn't the primary
- _scan_snaps(for_meta_scrub);
- if (!scrubber.store->empty()) {
- if (state_test(PG_STATE_REPAIR)) {
- dout(10) << __func__ << ": discarding scrub results" << dendl;
- scrubber.store->flush(nullptr);
- } else {
- dout(10) << __func__ << ": updating scrub object" << dendl;
- ObjectStore::Transaction t;
- scrubber.store->flush(&t);
- osd->store->queue_transaction(ch, std::move(t), nullptr);
- }
- }
-}
-
-bool PG::scrub_process_inconsistent()
-{
- dout(10) << __func__ << ": checking authoritative" << dendl;
- bool repair = state_test(PG_STATE_REPAIR);
- bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
- const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
-
- // authoriative only store objects which missing or inconsistent.
- if (!scrubber.authoritative.empty()) {
- stringstream ss;
- ss << info.pgid << " " << mode << " "
- << scrubber.missing.size() << " missing, "
- << scrubber.inconsistent.size() << " inconsistent objects";
- dout(2) << ss.str() << dendl;
- osd->clog->error(ss);
- if (repair) {
- state_clear(PG_STATE_CLEAN);
- for (map<hobject_t, list<pair<ScrubMap::object, pg_shard_t> >>::iterator i =
- scrubber.authoritative.begin();
- i != scrubber.authoritative.end();
- ++i) {
- set<pg_shard_t>::iterator j;
-
- auto missing_entry = scrubber.missing.find(i->first);
- if (missing_entry != scrubber.missing.end()) {
- for (j = missing_entry->second.begin();
- j != missing_entry->second.end();
- ++j) {
- repair_object(
- i->first,
- &(i->second),
- *j);
- ++scrubber.fixed;
- }
- }
- if (scrubber.inconsistent.count(i->first)) {
- for (j = scrubber.inconsistent[i->first].begin();
- j != scrubber.inconsistent[i->first].end();
- ++j) {
- repair_object(i->first,
- &(i->second),
- *j);
- ++scrubber.fixed;
- }
- }
- }
- }
- }
- return (!scrubber.authoritative.empty() && repair);
-}
-
-bool PG::ops_blocked_by_scrub() const {
- return (waiting_for_scrub.size() != 0);
-}
-
-// the part that actually finalizes a scrub
-void PG::scrub_finish()
-{
- dout(20) << __func__ << dendl;
- bool repair = state_test(PG_STATE_REPAIR);
- bool do_deep_scrub = false;
- // if the repair request comes from auto-repair and large number of errors,
- // we would like to cancel auto-repair
- if (repair && scrubber.auto_repair
- && scrubber.authoritative.size() > cct->_conf->osd_scrub_auto_repair_num_errors) {
- state_clear(PG_STATE_REPAIR);
- repair = false;
- }
- bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
- const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
-
- // if a regular scrub had errors within the limit, do a deep scrub to auto repair.
- if (scrubber.deep_scrub_on_error
- && scrubber.authoritative.size() <= cct->_conf->osd_scrub_auto_repair_num_errors) {
- ceph_assert(!deep_scrub);
- scrubber.deep_scrub_on_error = false;
- do_deep_scrub = true;
- dout(20) << __func__ << " Try to auto repair after scrub errors" << dendl;
- }
-
- // type-specific finish (can tally more errors)
- _scrub_finish();
-
- bool has_error = scrub_process_inconsistent();
-
- {
- stringstream oss;
- oss << info.pgid.pgid << " " << mode << " ";
- int total_errors = scrubber.shallow_errors + scrubber.deep_errors;
- if (total_errors)
- oss << total_errors << " errors";
- else
- oss << "ok";
- if (!deep_scrub && info.stats.stats.sum.num_deep_scrub_errors)
- oss << " ( " << info.stats.stats.sum.num_deep_scrub_errors
- << " remaining deep scrub error details lost)";
- if (repair)
- oss << ", " << scrubber.fixed << " fixed";
- if (total_errors)
- osd->clog->error(oss);
- else
- osd->clog->debug(oss);
- }
-
- // finish up
- unreg_next_scrub();
- utime_t now = ceph_clock_now();
- info.history.last_scrub = info.last_update;
- info.history.last_scrub_stamp = now;
- if (scrubber.deep) {
- info.history.last_deep_scrub = info.last_update;
- info.history.last_deep_scrub_stamp = now;
- }
- // Since we don't know which errors were fixed, we can only clear them
- // when every one has been fixed.
- if (repair) {
- if (scrubber.fixed == scrubber.shallow_errors + scrubber.deep_errors) {
- ceph_assert(deep_scrub);
- scrubber.shallow_errors = scrubber.deep_errors = 0;
- dout(20) << __func__ << " All may be fixed" << dendl;
- } else if (has_error) {
- // Deep scrub in order to get corrected error counts
- scrub_after_recovery = true;
- dout(20) << __func__ << " Set scrub_after_recovery" << dendl;
- } else if (scrubber.shallow_errors || scrubber.deep_errors) {
- // We have errors but nothing can be fixed, so there is no repair
- // possible.
- state_set(PG_STATE_FAILED_REPAIR);
- dout(10) << __func__ << " " << (scrubber.shallow_errors + scrubber.deep_errors)
- << " error(s) present with no repair possible" << dendl;
- }
- }
- if (deep_scrub) {
- if ((scrubber.shallow_errors == 0) && (scrubber.deep_errors == 0))
- info.history.last_clean_scrub_stamp = now;
- info.stats.stats.sum.num_shallow_scrub_errors = scrubber.shallow_errors;
- info.stats.stats.sum.num_deep_scrub_errors = scrubber.deep_errors;
- info.stats.stats.sum.num_large_omap_objects = scrubber.omap_stats.large_omap_objects;
- info.stats.stats.sum.num_omap_bytes = scrubber.omap_stats.omap_bytes;
- info.stats.stats.sum.num_omap_keys = scrubber.omap_stats.omap_keys;
- dout(25) << __func__ << " shard " << pg_whoami << " num_omap_bytes = "
- << info.stats.stats.sum.num_omap_bytes << " num_omap_keys = "
- << info.stats.stats.sum.num_omap_keys << dendl;
- } else {
- info.stats.stats.sum.num_shallow_scrub_errors = scrubber.shallow_errors;
- // XXX: last_clean_scrub_stamp doesn't mean the pg is not inconsistent
- // because of deep-scrub errors
- if (scrubber.shallow_errors == 0)
- info.history.last_clean_scrub_stamp = now;
- }
- info.stats.stats.sum.num_scrub_errors =
- info.stats.stats.sum.num_shallow_scrub_errors +
- info.stats.stats.sum.num_deep_scrub_errors;
- if (scrubber.check_repair) {
- scrubber.check_repair = false;
- if (info.stats.stats.sum.num_scrub_errors) {
- state_set(PG_STATE_FAILED_REPAIR);
- dout(10) << __func__ << " " << info.stats.stats.sum.num_scrub_errors
- << " error(s) still present after re-scrub" << dendl;
- }
- }
- publish_stats_to_osd();
- if (do_deep_scrub) {
- // XXX: Auto scrub won't activate if must_scrub is set, but
- // setting the scrub stamps affects what users see.
- utime_t stamp = utime_t(0,1);
- set_last_scrub_stamp(stamp);
- set_last_deep_scrub_stamp(stamp);
- }
- reg_next_scrub();
-
- {
- ObjectStore::Transaction t;
- dirty_info = true;
- write_if_dirty(t);
- int tr = osd->store->queue_transaction(ch, std::move(t), NULL);
- ceph_assert(tr == 0);
- }
-
-
- if (has_error) {
- queue_peering_event(
- PGPeeringEventRef(
- std::make_shared<PGPeeringEvent>(
- get_osdmap_epoch(),
- get_osdmap_epoch(),
- DoRecovery())));
- }
-
- scrub_clear_state(has_error);
- scrub_unreserve_replicas();
-
- if (is_active() && is_primary()) {
- share_pg_info();
- }
-}
-
-void PG::share_pg_info()
-{
- dout(10) << "share_pg_info" << dendl;
-
- // share new pg_info_t with replicas
- ceph_assert(!acting_recovery_backfill.empty());
- for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
- i != acting_recovery_backfill.end();
- ++i) {
- if (*i == pg_whoami) continue;
- auto pg_shard = *i;
- auto peer = peer_info.find(pg_shard);
- if (peer != peer_info.end()) {
- peer->second.last_epoch_started = info.last_epoch_started;
- peer->second.last_interval_started = info.last_interval_started;
- peer->second.history.merge(info.history);
- }
- MOSDPGInfo *m = new MOSDPGInfo(get_osdmap_epoch());
- m->pg_list.push_back(
- make_pair(
- pg_notify_t(
- pg_shard.shard, pg_whoami.shard,
- get_osdmap_epoch(),
- get_osdmap_epoch(),
- info),
- past_intervals));
- osd->send_message_osd_cluster(pg_shard.osd, m, get_osdmap_epoch());
- }
-}
-
-bool PG::append_log_entries_update_missing(
- const mempool::osd_pglog::list<pg_log_entry_t> &entries,
- ObjectStore::Transaction &t, boost::optional<eversion_t> trim_to,
- boost::optional<eversion_t> roll_forward_to)
-{
- ceph_assert(!entries.empty());
- ceph_assert(entries.begin()->version > info.last_update);
-
- PGLogEntryHandler rollbacker{this, &t};
- bool invalidate_stats =
- pg_log.append_new_log_entries(info.last_backfill,
- info.last_backfill_bitwise,
- entries,
- &rollbacker);
-
- if (roll_forward_to && entries.rbegin()->soid > info.last_backfill) {
- pg_log.roll_forward(&rollbacker);
- }
- if (roll_forward_to && *roll_forward_to > pg_log.get_can_rollback_to()) {
- pg_log.roll_forward_to(*roll_forward_to, &rollbacker);
- last_rollback_info_trimmed_to_applied = *roll_forward_to;
- }
-
- info.last_update = pg_log.get_head();
-
- if (pg_log.get_missing().num_missing() == 0) {
- // advance last_complete since nothing else is missing!
- info.last_complete = info.last_update;
- }
- info.stats.stats_invalid = info.stats.stats_invalid || invalidate_stats;
-
- dout(20) << __func__ << " trim_to bool = " << bool(trim_to) << " trim_to = " << (trim_to ? *trim_to : eversion_t()) << dendl;
- if (trim_to)
- pg_log.trim(*trim_to, info);
- dirty_info = true;
- write_if_dirty(t);
- return invalidate_stats;
-}
-
-
-void PG::merge_new_log_entries(
- const mempool::osd_pglog::list<pg_log_entry_t> &entries,
- ObjectStore::Transaction &t,
- boost::optional<eversion_t> trim_to,
- boost::optional<eversion_t> roll_forward_to)
-{
- dout(10) << __func__ << " " << entries << dendl;
- ceph_assert(is_primary());
-
- bool rebuild_missing = append_log_entries_update_missing(entries, t, trim_to, roll_forward_to);
- for (set<pg_shard_t>::const_iterator i = acting_recovery_backfill.begin();
- i != acting_recovery_backfill.end();
- ++i) {
- pg_shard_t peer(*i);
- if (peer == pg_whoami) continue;
- ceph_assert(peer_missing.count(peer));
- ceph_assert(peer_info.count(peer));
- pg_missing_t& pmissing(peer_missing[peer]);
- dout(20) << __func__ << " peer_missing for " << peer << " = " << pmissing << dendl;
- pg_info_t& pinfo(peer_info[peer]);
- bool invalidate_stats = PGLog::append_log_entries_update_missing(
- pinfo.last_backfill,
- info.last_backfill_bitwise,
- entries,
- true,
- NULL,
- pmissing,
- NULL,
- this);
- pinfo.last_update = info.last_update;
- pinfo.stats.stats_invalid = pinfo.stats.stats_invalid || invalidate_stats;
- rebuild_missing = rebuild_missing || invalidate_stats;
- }
-
- if (!rebuild_missing) {
- return;
- }
-
- for (auto &&i: entries) {
- missing_loc.rebuild(
- i.soid,
- pg_whoami,
- acting_recovery_backfill,
- info,
- pg_log.get_missing(),
- peer_missing,
- peer_info);
- }
-}
-
-void PG::update_history(const pg_history_t& new_history)
-{
- unreg_next_scrub();
- if (info.history.merge(new_history)) {
- dout(20) << __func__ << " advanced history from " << new_history << dendl;
- dirty_info = true;
- if (info.history.last_epoch_clean >= info.history.same_interval_since) {
- dout(20) << __func__ << " clearing past_intervals" << dendl;
- past_intervals.clear();
- dirty_big_info = true;
- }
- }
- reg_next_scrub();
-}
-
-void PG::fulfill_info(
- pg_shard_t from, const pg_query_t &query,
- pair<pg_shard_t, pg_info_t> ¬ify_info)
-{
- ceph_assert(from == primary);
- ceph_assert(query.type == pg_query_t::INFO);
-
- // info
- dout(10) << "sending info" << dendl;
- notify_info = make_pair(from, info);
-}
-
-void PG::fulfill_log(
- pg_shard_t from, const pg_query_t &query, epoch_t query_epoch)
-{
- dout(10) << "log request from " << from << dendl;
- ceph_assert(from == primary);
- ceph_assert(query.type != pg_query_t::INFO);
- ConnectionRef con = osd->get_con_osd_cluster(
- from.osd, get_osdmap_epoch());
- if (!con) return;
-
- MOSDPGLog *mlog = new MOSDPGLog(
- from.shard, pg_whoami.shard,
- get_osdmap_epoch(),
- info, query_epoch);
- mlog->missing = pg_log.get_missing();
-
- // primary -> other, when building master log
- if (query.type == pg_query_t::LOG) {
- dout(10) << " sending info+missing+log since " << query.since
- << dendl;
- if (query.since != eversion_t() && query.since < pg_log.get_tail()) {
- osd->clog->error() << info.pgid << " got broken pg_query_t::LOG since " << query.since
- << " when my log.tail is " << pg_log.get_tail()
- << ", sending full log instead";
- mlog->log = pg_log.get_log(); // primary should not have requested this!!
- } else
- mlog->log.copy_after(pg_log.get_log(), query.since);
- }
- else if (query.type == pg_query_t::FULLLOG) {
- dout(10) << " sending info+missing+full log" << dendl;
- mlog->log = pg_log.get_log();
- }
-
- dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl;
-
- osd->share_map_peer(from.osd, con.get(), get_osdmap());
- osd->send_message_osd_cluster(mlog, con.get());
-}
-
-void PG::fulfill_query(const MQuery& query, RecoveryCtx *rctx)
-{
- if (query.query.type == pg_query_t::INFO) {
- pair<pg_shard_t, pg_info_t> notify_info;
- update_history(query.query.history);
- fulfill_info(query.from, query.query, notify_info);
- rctx->send_notify(
- notify_info.first,
- pg_notify_t(
- notify_info.first.shard, pg_whoami.shard,
- query.query_epoch,
- get_osdmap_epoch(),
- notify_info.second),
- past_intervals);
- } else {
- update_history(query.query.history);
- fulfill_log(query.from, query.query, query.query_epoch);
- }
-}
-
-void PG::check_full_transition(OSDMapRef lastmap, OSDMapRef osdmap)
-{
- bool changed = false;
- if (osdmap->test_flag(CEPH_OSDMAP_FULL) &&
- !lastmap->test_flag(CEPH_OSDMAP_FULL)) {
- dout(10) << " cluster was marked full in " << osdmap->get_epoch() << dendl;
- changed = true;
- }
- const pg_pool_t *pi = osdmap->get_pg_pool(info.pgid.pool());
- if (!pi) {
- return; // pool deleted
- }
- if (pi->has_flag(pg_pool_t::FLAG_FULL)) {
- const pg_pool_t *opi = lastmap->get_pg_pool(info.pgid.pool());
- if (!opi || !opi->has_flag(pg_pool_t::FLAG_FULL)) {
- dout(10) << " pool was marked full in " << osdmap->get_epoch() << dendl;
- changed = true;
- }
- }
- if (changed) {
- info.history.last_epoch_marked_full = osdmap->get_epoch();
- dirty_info = true;
- }
-}
-
-bool PG::should_restart_peering(
- int newupprimary,
- int newactingprimary,
- const vector<int>& newup,
- const vector<int>& newacting,
- OSDMapRef lastmap,
- OSDMapRef osdmap)
-{
- if (PastIntervals::is_new_interval(
- primary.osd,
- newactingprimary,
- acting,
- newacting,
- up_primary.osd,
- newupprimary,
- up,
- newup,
- osdmap,
- lastmap,
- info.pgid.pgid)) {
- dout(20) << "new interval newup " << newup
- << " newacting " << newacting << dendl;
- return true;
- }
- if (!lastmap->is_up(osd->whoami) && osdmap->is_up(osd->whoami)) {
- dout(10) << __func__ << " osd transitioned from down -> up" << dendl;
- return true;
- }
- return false;
-}
-
-bool PG::old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch)
-{
- if (last_peering_reset > reply_epoch ||
- last_peering_reset > query_epoch) {
- dout(10) << "old_peering_msg reply_epoch " << reply_epoch << " query_epoch " << query_epoch
- << " last_peering_reset " << last_peering_reset
- << dendl;
- return true;
- }
- return false;
-}
-
-void PG::set_last_peering_reset()
-{
- dout(20) << "set_last_peering_reset " << get_osdmap_epoch() << dendl;
- if (last_peering_reset != get_osdmap_epoch()) {
- last_peering_reset = get_osdmap_epoch();
- reset_interval_flush();
- }
-}
-
-struct FlushState {
- PGRef pg;
- epoch_t epoch;
- FlushState(PG *pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
- ~FlushState() {
- pg->lock();
- if (!pg->pg_has_reset_since(epoch))
- pg->on_flushed();
- pg->unlock();
- }
-};
-typedef std::shared_ptr<FlushState> FlushStateRef;
-
-void PG::start_flush(ObjectStore::Transaction *t)
-{
- // flush in progress ops
- FlushStateRef flush_trigger (std::make_shared<FlushState>(
- this, get_osdmap_epoch()));
- flushes_in_progress++;
- t->register_on_applied(new ContainerContext<FlushStateRef>(flush_trigger));
- t->register_on_commit(new ContainerContext<FlushStateRef>(flush_trigger));
-}
-
-void PG::reset_interval_flush()
-{
- dout(10) << "Clearing blocked outgoing recovery messages" << dendl;
- recovery_state.clear_blocked_outgoing();
-
- Context *c = new QueuePeeringEvt<IntervalFlush>(
- this, get_osdmap_epoch(), IntervalFlush());
- if (!ch->flush_commit(c)) {
- dout(10) << "Beginning to block outgoing recovery messages" << dendl;
- recovery_state.begin_block_outgoing();
- } else {
- dout(10) << "Not blocking outgoing recovery messages" << dendl;
- delete c;
- }
-}
-
-/* Called before initializing peering during advance_map */
-void PG::start_peering_interval(
- const OSDMapRef lastmap,
- const vector<int>& newup, int new_up_primary,
- const vector<int>& newacting, int new_acting_primary,
- ObjectStore::Transaction *t)
-{
- const OSDMapRef osdmap = get_osdmap();
-
- set_last_peering_reset();
-
- vector<int> oldacting, oldup;
- int oldrole = get_role();
-
- unreg_next_scrub();
-
- if (is_primary()) {
- osd->clear_ready_to_merge(this);
- }
-
- pg_shard_t old_acting_primary = get_primary();
- pg_shard_t old_up_primary = up_primary;
- bool was_old_primary = is_primary();
- bool was_old_replica = is_replica();
-
- acting.swap(oldacting);
- up.swap(oldup);
- init_primary_up_acting(
- newup,
- newacting,
- new_up_primary,
- new_acting_primary);
-
- if (info.stats.up != up ||
- info.stats.acting != acting ||
- info.stats.up_primary != new_up_primary ||
- info.stats.acting_primary != new_acting_primary) {
- info.stats.up = up;
- info.stats.up_primary = new_up_primary;
- info.stats.acting = acting;
- info.stats.acting_primary = new_acting_primary;
- info.stats.mapping_epoch = osdmap->get_epoch();
- }
-
- pg_stats_publish_lock.Lock();
- pg_stats_publish_valid = false;
- pg_stats_publish_lock.Unlock();
-
- // This will now be remapped during a backfill in cases
- // that it would not have been before.
- if (up != acting)
- state_set(PG_STATE_REMAPPED);
- else
- state_clear(PG_STATE_REMAPPED);
-
- int role = osdmap->calc_pg_role(osd->whoami, acting, acting.size());
- if (pool.info.is_replicated() || role == pg_whoami.shard)
- set_role(role);
- else
- set_role(-1);
-
- // did acting, up, primary|acker change?
- if (!lastmap) {
- dout(10) << " no lastmap" << dendl;
- dirty_info = true;
- dirty_big_info = true;
- info.history.same_interval_since = osdmap->get_epoch();
- } else {
- std::stringstream debug;
- ceph_assert(info.history.same_interval_since != 0);
- boost::scoped_ptr<IsPGRecoverablePredicate> recoverable(
- get_is_recoverable_predicate());
- bool new_interval = PastIntervals::check_new_interval(
- old_acting_primary.osd,
- new_acting_primary,
- oldacting, newacting,
- old_up_primary.osd,
- new_up_primary,
- oldup, newup,
- info.history.same_interval_since,
- info.history.last_epoch_clean,
- osdmap,
- lastmap,
- info.pgid.pgid,
- recoverable.get(),
- &past_intervals,
- &debug);
- dout(10) << __func__ << ": check_new_interval output: "
- << debug.str() << dendl;
- if (new_interval) {
- if (osdmap->get_epoch() == osd->get_superblock().oldest_map &&
- info.history.last_epoch_clean < osdmap->get_epoch()) {
- dout(10) << " map gap, clearing past_intervals and faking" << dendl;
- // our information is incomplete and useless; someone else was clean
- // after everything we know if osdmaps were trimmed.
- past_intervals.clear();
- } else {
- dout(10) << " noting past " << past_intervals << dendl;
- }
- dirty_info = true;
- dirty_big_info = true;
- info.history.same_interval_since = osdmap->get_epoch();
- if (osdmap->have_pg_pool(info.pgid.pgid.pool()) &&
- info.pgid.pgid.is_split(lastmap->get_pg_num(info.pgid.pgid.pool()),
- osdmap->get_pg_num(info.pgid.pgid.pool()),
- nullptr)) {
- info.history.last_epoch_split = osdmap->get_epoch();
- }
- }
- }
-
- if (old_up_primary != up_primary ||
- oldup != up) {
- info.history.same_up_since = osdmap->get_epoch();
- }
- // this comparison includes primary rank via pg_shard_t
- if (old_acting_primary != get_primary()) {
- info.history.same_primary_since = osdmap->get_epoch();
- }
-
- on_new_interval();
-
- dout(1) << __func__ << " up " << oldup << " -> " << up
- << ", acting " << oldacting << " -> " << acting
- << ", acting_primary " << old_acting_primary << " -> " << new_acting_primary
- << ", up_primary " << old_up_primary << " -> " << new_up_primary
- << ", role " << oldrole << " -> " << role
- << ", features acting " << acting_features
- << " upacting " << upacting_features
- << dendl;
-
- // deactivate.
- state_clear(PG_STATE_ACTIVE);
- state_clear(PG_STATE_PEERED);
- state_clear(PG_STATE_PREMERGE);
- state_clear(PG_STATE_DOWN);
- state_clear(PG_STATE_RECOVERY_WAIT);
- state_clear(PG_STATE_RECOVERY_TOOFULL);
- state_clear(PG_STATE_RECOVERING);
-
- peer_purged.clear();
- acting_recovery_backfill.clear();
- scrub_queued = false;
-
- // reset primary/replica state?
- if (was_old_primary || is_primary()) {
- osd->remove_want_pg_temp(info.pgid.pgid);
- } else if (was_old_replica || is_replica()) {
- osd->remove_want_pg_temp(info.pgid.pgid);
- }
- clear_primary_state();
-
-
- // pg->on_*
- on_change(t);
-
- projected_last_update = eversion_t();
-
- ceph_assert(!deleting);
-
- // should we tell the primary we are here?
- send_notify = !is_primary();
-
- if (role != oldrole ||
- was_old_primary != is_primary()) {
- // did primary change?
- if (was_old_primary != is_primary()) {
- state_clear(PG_STATE_CLEAN);
- clear_publish_stats();
- }
-
- on_role_change();
-
- // take active waiters
- requeue_ops(waiting_for_peered);
-
- } else {
- // no role change.
- // did primary change?
- if (get_primary() != old_acting_primary) {
- dout(10) << *this << " " << oldacting << " -> " << acting
- << ", acting primary "
- << old_acting_primary << " -> " << get_primary()
- << dendl;
- } else {
- // primary is the same.
- if (is_primary()) {
- // i am (still) primary. but my replica set changed.
- state_clear(PG_STATE_CLEAN);
-
- dout(10) << oldacting << " -> " << acting
- << ", replicas changed" << dendl;
- }
- }
- }
- cancel_recovery();
-
- if (acting.empty() && !up.empty() && up_primary == pg_whoami) {
- dout(10) << " acting empty, but i am up[0], clearing pg_temp" << dendl;
- osd->queue_want_pg_temp(info.pgid.pgid, acting);
- }
-}
-
-void PG::on_new_interval()
-{
- const OSDMapRef osdmap = get_osdmap();
-
- reg_next_scrub();
-
- // initialize features
- acting_features = CEPH_FEATURES_SUPPORTED_DEFAULT;
- upacting_features = CEPH_FEATURES_SUPPORTED_DEFAULT;
- for (vector<int>::iterator p = acting.begin(); p != acting.end(); ++p) {
- if (*p == CRUSH_ITEM_NONE)
- continue;
- uint64_t f = osdmap->get_xinfo(*p).features;
- acting_features &= f;
- upacting_features &= f;
- }
- for (vector<int>::iterator p = up.begin(); p != up.end(); ++p) {
- if (*p == CRUSH_ITEM_NONE)
- continue;
- upacting_features &= osdmap->get_xinfo(*p).features;
- }
-
- _on_new_interval();
-}
-
-void PG::proc_primary_info(ObjectStore::Transaction &t, const pg_info_t &oinfo)
-{
- ceph_assert(!is_primary());
-
- update_history(oinfo.history);
- if (!info.stats.stats_invalid && info.stats.stats.sum.num_scrub_errors) {
- info.stats.stats.sum.num_scrub_errors = 0;
- info.stats.stats.sum.num_shallow_scrub_errors = 0;
- info.stats.stats.sum.num_deep_scrub_errors = 0;
- dirty_info = true;
- }
-
- if (!(info.purged_snaps == oinfo.purged_snaps)) {
- dout(10) << __func__ << " updating purged_snaps to " << oinfo.purged_snaps
- << dendl;
- info.purged_snaps = oinfo.purged_snaps;
- dirty_info = true;
- dirty_big_info = true;
- }
-}
-
-ostream& operator<<(ostream& out, const PG& pg)
-{
- out << "pg[" << pg.info
- << " " << pg.up;
- if (pg.acting != pg.up)
- out << "/" << pg.acting;
- if (pg.is_ec_pg())
- out << "p" << pg.get_primary();
- if (!pg.async_recovery_targets.empty())
- out << " async=[" << pg.async_recovery_targets << "]";
- if (!pg.backfill_targets.empty())
- out << " backfill=[" << pg.backfill_targets << "]";
- out << " r=" << pg.get_role();
- out << " lpr=" << pg.get_last_peering_reset();
-
- if (pg.deleting)
- out << " DELETING";
-
- if (!pg.past_intervals.empty()) {
- out << " pi=[" << pg.past_intervals.get_bounds()
- << ")/" << pg.past_intervals.size();
- }
-
- if (pg.is_peered()) {
- if (pg.last_update_ondisk != pg.info.last_update)
- out << " luod=" << pg.last_update_ondisk;
- if (pg.last_update_applied != pg.info.last_update)
- out << " lua=" << pg.last_update_applied;
- }
-
- if (pg.recovery_ops_active)
- out << " rops=" << pg.recovery_ops_active;
-
- if (pg.pg_log.get_tail() != pg.info.log_tail ||
- pg.pg_log.get_head() != pg.info.last_update)
- out << " (info mismatch, " << pg.pg_log.get_log() << ")";
-
- if (!pg.pg_log.get_log().empty()) {
- if ((pg.pg_log.get_log().log.begin()->version <= pg.pg_log.get_tail())) {
- out << " (log bound mismatch, actual=["
- << pg.pg_log.get_log().log.begin()->version << ","
- << pg.pg_log.get_log().log.rbegin()->version << "]";
- out << ")";
- }
- }
-
- out << " crt=" << pg.pg_log.get_can_rollback_to();
-
- if (pg.last_complete_ondisk != pg.info.last_complete)
- out << " lcod " << pg.last_complete_ondisk;
-
- if (pg.is_primary()) {
- out << " mlcod " << pg.min_last_complete_ondisk;
- }
-
- out << " " << pg_state_string(pg.get_state());
- if (pg.should_send_notify())
- out << " NOTIFY";
-
- if (pg.scrubber.must_repair)
- out << " MUST_REPAIR";
- if (pg.scrubber.auto_repair)
- out << " AUTO_REPAIR";
- if (pg.scrubber.check_repair)
- out << " CHECK_REPAIR";
- if (pg.scrubber.deep_scrub_on_error)
- out << " DEEP_SCRUB_ON_ERROR";
- if (pg.scrubber.must_deep_scrub)
- out << " MUST_DEEP_SCRUB";
- if (pg.scrubber.must_scrub)
- out << " MUST_SCRUB";
-
- //out << " (" << pg.pg_log.get_tail() << "," << pg.pg_log.get_head() << "]";
- if (pg.pg_log.get_missing().num_missing()) {
- out << " m=" << pg.pg_log.get_missing().num_missing();
- if (pg.is_primary()) {
- uint64_t unfound = pg.get_num_unfound();
- if (unfound)
- out << " u=" << unfound;
- }
- }
- if (!pg.is_clean()) {
- out << " mbc=" << pg.missing_loc.get_missing_by_count();
- }
- if (!pg.snap_trimq.empty()) {
- out << " trimq=";
- // only show a count if the set is large
- if (pg.snap_trimq.num_intervals() > 16) {
- out << pg.snap_trimq.size();
- } else {
- out << pg.snap_trimq;
- }
- }
- if (!pg.info.purged_snaps.empty()) {
- out << " ps="; // snap trim queue / purged snaps
- if (pg.info.purged_snaps.num_intervals() > 16) {
- out << pg.info.purged_snaps.size();
- } else {
- out << pg.info.purged_snaps;
- }
- }
-
- out << "]";
-
-
- return out;
-}
-
-bool PG::can_discard_op(OpRequestRef& op)
-{
- const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
- if (cct->_conf->osd_discard_disconnected_ops && OSD::op_is_discardable(m)) {
- dout(20) << " discard " << *m << dendl;
- return true;
- }
-
- if (m->get_map_epoch() < info.history.same_primary_since) {
- dout(7) << " changed after " << m->get_map_epoch()
- << ", dropping " << *m << dendl;
- return true;
- }
-
- if (m->get_connection()->has_feature(CEPH_FEATURE_RESEND_ON_SPLIT)) {
- // >= luminous client
- if (m->get_connection()->has_feature(CEPH_FEATURE_SERVER_NAUTILUS)) {
- // >= nautilus client
- if (m->get_map_epoch() < pool.info.get_last_force_op_resend()) {
- dout(7) << __func__ << " sent before last_force_op_resend "
- << pool.info.last_force_op_resend
- << ", dropping" << *m << dendl;
- return true;
- }
- } else {
- // == < nautilus client (luminous or mimic)
- if (m->get_map_epoch() < pool.info.get_last_force_op_resend_prenautilus()) {
- dout(7) << __func__ << " sent before last_force_op_resend_prenautilus "
- << pool.info.last_force_op_resend_prenautilus
- << ", dropping" << *m << dendl;
- return true;
- }
- }
- if (m->get_map_epoch() < info.history.last_epoch_split) {
- dout(7) << __func__ << " pg split in "
- << info.history.last_epoch_split << ", dropping" << dendl;
- return true;
- }
- } else if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_POOLRESEND)) {
- // < luminous client
- if (m->get_map_epoch() < pool.info.get_last_force_op_resend_preluminous()) {
- dout(7) << __func__ << " sent before last_force_op_resend_preluminous "
- << pool.info.last_force_op_resend_preluminous
- << ", dropping" << *m << dendl;
- return true;
- }
- }
-
- return false;
-}
-
-template<typename T, int MSGTYPE>
-bool PG::can_discard_replica_op(OpRequestRef& op)
-{
- const T *m = static_cast<const T *>(op->get_req());
- ceph_assert(m->get_type() == MSGTYPE);
-
- int from = m->get_source().num();
-
- // if a repop is replied after a replica goes down in a new osdmap, and
- // before the pg advances to this new osdmap, the repop replies before this
- // repop can be discarded by that replica OSD, because the primary resets the
- // connection to it when handling the new osdmap marking it down, and also
- // resets the messenger sesssion when the replica reconnects. to avoid the
- // out-of-order replies, the messages from that replica should be discarded.
- OSDMapRef next_map = osd->get_next_osdmap();
- if (next_map->is_down(from))
- return true;
- /* Mostly, this overlaps with the old_peering_msg
- * condition. An important exception is pushes
- * sent by replicas not in the acting set, since
- * if such a replica goes down it does not cause
- * a new interval. */
- if (next_map->get_down_at(from) >= m->map_epoch)
- return true;
-
- // same pg?
- // if pg changes _at all_, we reset and repeer!
- if (old_peering_msg(m->map_epoch, m->map_epoch)) {
- dout(10) << "can_discard_replica_op pg changed " << info.history
- << " after " << m->map_epoch
- << ", dropping" << dendl;
- return true;
- }
- return false;
-}
-
-bool PG::can_discard_scan(OpRequestRef op)
-{
- const MOSDPGScan *m = static_cast<const MOSDPGScan *>(op->get_req());
- ceph_assert(m->get_type() == MSG_OSD_PG_SCAN);
-
- if (old_peering_msg(m->map_epoch, m->query_epoch)) {
- dout(10) << " got old scan, ignoring" << dendl;
- return true;
- }
- return false;
-}
-
-bool PG::can_discard_backfill(OpRequestRef op)
-{
- const MOSDPGBackfill *m = static_cast<const MOSDPGBackfill *>(op->get_req());
- ceph_assert(m->get_type() == MSG_OSD_PG_BACKFILL);
-
- if (old_peering_msg(m->map_epoch, m->query_epoch)) {
- dout(10) << " got old backfill, ignoring" << dendl;
- return true;
- }
-
- return false;
-
-}
-
-bool PG::can_discard_request(OpRequestRef& op)
-{
- switch (op->get_req()->get_type()) {
- case CEPH_MSG_OSD_OP:
- return can_discard_op(op);
- case CEPH_MSG_OSD_BACKOFF:
- return false; // never discard
- case MSG_OSD_REPOP:
- return can_discard_replica_op<MOSDRepOp, MSG_OSD_REPOP>(op);
- case MSG_OSD_PG_PUSH:
- return can_discard_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op);
- case MSG_OSD_PG_PULL:
- return can_discard_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op);
- case MSG_OSD_PG_PUSH_REPLY:
- return can_discard_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op);
- case MSG_OSD_REPOPREPLY:
- return can_discard_replica_op<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op);
- case MSG_OSD_PG_RECOVERY_DELETE:
- return can_discard_replica_op<MOSDPGRecoveryDelete, MSG_OSD_PG_RECOVERY_DELETE>(op);
-
- case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
- return can_discard_replica_op<MOSDPGRecoveryDeleteReply, MSG_OSD_PG_RECOVERY_DELETE_REPLY>(op);
-
- case MSG_OSD_EC_WRITE:
- return can_discard_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op);
- case MSG_OSD_EC_WRITE_REPLY:
- return can_discard_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op);
- case MSG_OSD_EC_READ:
- return can_discard_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
- case MSG_OSD_EC_READ_REPLY:
- return can_discard_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
- case MSG_OSD_REP_SCRUB:
- return can_discard_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op);
- case MSG_OSD_SCRUB_RESERVE:
- return can_discard_replica_op<MOSDScrubReserve, MSG_OSD_SCRUB_RESERVE>(op);
- case MSG_OSD_REP_SCRUBMAP:
- return can_discard_replica_op<MOSDRepScrubMap, MSG_OSD_REP_SCRUBMAP>(op);
- case MSG_OSD_PG_UPDATE_LOG_MISSING:
- return can_discard_replica_op<
- MOSDPGUpdateLogMissing, MSG_OSD_PG_UPDATE_LOG_MISSING>(op);
- case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
- return can_discard_replica_op<
- MOSDPGUpdateLogMissingReply, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>(op);
-
- case MSG_OSD_PG_SCAN:
- return can_discard_scan(op);
- case MSG_OSD_PG_BACKFILL:
- return can_discard_backfill(op);
- case MSG_OSD_PG_BACKFILL_REMOVE:
- return can_discard_replica_op<MOSDPGBackfillRemove,
- MSG_OSD_PG_BACKFILL_REMOVE>(op);
- }
- return true;
-}
-
-void PG::take_waiters()
-{
- dout(10) << "take_waiters" << dendl;
- requeue_map_waiters();
-}
-
-void PG::do_peering_event(PGPeeringEventRef evt, RecoveryCtx *rctx)
-{
- dout(10) << __func__ << ": " << evt->get_desc() << dendl;
- ceph_assert(have_same_or_newer_map(evt->get_epoch_sent()));
- if (old_peering_evt(evt)) {
- dout(10) << "discard old " << evt->get_desc() << dendl;
- } else {
- recovery_state.handle_event(evt, rctx);
- }
- // write_if_dirty regardless of path above to ensure we capture any work
- // done by OSD::advance_pg().
- write_if_dirty(*rctx->transaction);
-}
-
-void PG::queue_peering_event(PGPeeringEventRef evt)
-{
- if (old_peering_evt(evt))
- return;
- osd->osd->enqueue_peering_evt(info.pgid, evt);
-}
-
-void PG::queue_null(epoch_t msg_epoch,
- epoch_t query_epoch)
-{
- dout(10) << "null" << dendl;
- queue_peering_event(
- PGPeeringEventRef(std::make_shared<PGPeeringEvent>(msg_epoch, query_epoch,
- NullEvt())));
-}
-
-void PG::find_unfound(epoch_t queued, RecoveryCtx *rctx)
-{
- /*
- * if we couldn't start any recovery ops and things are still
- * unfound, see if we can discover more missing object locations.
- * It may be that our initial locations were bad and we errored
- * out while trying to pull.
- */
- discover_all_missing(*rctx->query_map);
- if (rctx->query_map->empty()) {
- string action;
- if (state_test(PG_STATE_BACKFILLING)) {
- auto evt = PGPeeringEventRef(
- new PGPeeringEvent(
- queued,
- queued,
- PG::UnfoundBackfill()));
- queue_peering_event(evt);
- action = "in backfill";
- } else if (state_test(PG_STATE_RECOVERING)) {
- auto evt = PGPeeringEventRef(
- new PGPeeringEvent(
- queued,
- queued,
- PG::UnfoundRecovery()));
- queue_peering_event(evt);
- action = "in recovery";
- } else {
- action = "already out of recovery/backfill";
- }
- dout(10) << __func__ << ": no luck, giving up on this pg for now (" << action << ")" << dendl;
- } else {
- dout(10) << __func__ << ": no luck, giving up on this pg for now (queue_recovery)" << dendl;
- queue_recovery();
- }
-}
-
-void PG::handle_advance_map(
- OSDMapRef osdmap, OSDMapRef lastmap,
- vector<int>& newup, int up_primary,
- vector<int>& newacting, int acting_primary,
- RecoveryCtx *rctx)
-{
- ceph_assert(lastmap->get_epoch() == osdmap_ref->get_epoch());
- ceph_assert(lastmap == osdmap_ref);
- dout(10) << "handle_advance_map "
- << newup << "/" << newacting
- << " -- " << up_primary << "/" << acting_primary
- << dendl;
- update_osdmap_ref(osdmap);
- osd_shard->update_pg_epoch(pg_slot, osdmap->get_epoch());
-
- pool.update(cct, osdmap);
-
- AdvMap evt(
- osdmap, lastmap, newup, up_primary,
- newacting, acting_primary);
- recovery_state.handle_event(evt, rctx);
- if (pool.info.last_change == osdmap_ref->get_epoch()) {
- on_pool_change();
- update_store_with_options();
- }
- last_require_osd_release = osdmap->require_osd_release;
-}
-
-void PG::handle_activate_map(RecoveryCtx *rctx)
-{
- dout(10) << "handle_activate_map " << dendl;
- ActMap evt;
- recovery_state.handle_event(evt, rctx);
- if (osdmap_ref->get_epoch() - last_persisted_osdmap >
- cct->_conf->osd_pg_epoch_persisted_max_stale) {
- dout(20) << __func__ << ": Dirtying info: last_persisted is "
- << last_persisted_osdmap
- << " while current is " << osdmap_ref->get_epoch() << dendl;
- dirty_info = true;
- } else {
- dout(20) << __func__ << ": Not dirtying info: last_persisted is "
- << last_persisted_osdmap
- << " while current is " << osdmap_ref->get_epoch() << dendl;
- }
- if (osdmap_ref->check_new_blacklist_entries()) {
- check_blacklisted_watchers();
- }
- write_if_dirty(*rctx->transaction);
-}
-
-void PG::handle_initialize(RecoveryCtx *rctx)
-{
- dout(10) << __func__ << dendl;
- Initialize evt;
- recovery_state.handle_event(evt, rctx);
-}
-
-void PG::handle_query_state(Formatter *f)
-{
- dout(10) << "handle_query_state" << dendl;
- QueryState q(f);
- recovery_state.handle_event(q, 0);
-}
-
-void PG::update_store_with_options()
-{
- auto r = osd->store->set_collection_opts(ch, pool.info.opts);
- if(r < 0 && r != -EOPNOTSUPP) {
- derr << __func__ << " set_collection_opts returns error:" << r << dendl;
- }
-}
-
-struct C_DeleteMore : public Context {
- PGRef pg;
- epoch_t epoch;
- C_DeleteMore(PG *p, epoch_t e) : pg(p), epoch(e) {}
- void finish(int r) override {
- ceph_abort();
- }
- void complete(int r) override {
- ceph_assert(r == 0);
- pg->lock();
- if (!pg->pg_has_reset_since(epoch)) {
- pg->osd->queue_for_pg_delete(pg->get_pgid(), epoch);
- }
- pg->unlock();
- delete this;
- }
-};
-
-void PG::_delete_some(ObjectStore::Transaction *t)
-{
- dout(10) << __func__ << dendl;
-
- {
- float osd_delete_sleep = osd->osd->get_osd_delete_sleep();
- if (osd_delete_sleep > 0 && delete_needs_sleep) {
- epoch_t e = get_osdmap()->get_epoch();
- PGRef pgref(this);
- auto delete_requeue_callback = new FunctionContext([this, pgref, e](int r) {
- dout(20) << __func__ << " wake up at "
- << ceph_clock_now()
- << ", re-queuing delete" << dendl;
- lock();
- delete_needs_sleep = false;
- if (!pg_has_reset_since(e)) {
- osd->queue_for_pg_delete(get_pgid(), e);
- }
- unlock();
- });
-
- utime_t delete_schedule_time = ceph_clock_now();
- delete_schedule_time += osd_delete_sleep;
- Mutex::Locker l(osd->sleep_lock);
- osd->sleep_timer.add_event_at(delete_schedule_time,
- delete_requeue_callback);
- dout(20) << __func__ << " Delete scheduled at " << delete_schedule_time << dendl;
- return;
- }
- }
-
- delete_needs_sleep = true;
-
- vector<ghobject_t> olist;
- int max = std::min(osd->store->get_ideal_list_max(),
- (int)cct->_conf->osd_target_transaction_size);
- ghobject_t next;
- osd->store->collection_list(
- ch,
- next,
- ghobject_t::get_max(),
- max,
- &olist,
- &next);
- dout(20) << __func__ << " " << olist << dendl;
-
- OSDriver::OSTransaction _t(osdriver.get_transaction(t));
- int64_t num = 0;
- for (auto& oid : olist) {
- if (oid.is_pgmeta()) {
- continue;
- }
- int r = snap_mapper.remove_oid(oid.hobj, &_t);
- if (r != 0 && r != -ENOENT) {
- ceph_abort();
- }
- t->remove(coll, oid);
- ++num;
- }
- if (num) {
- dout(20) << __func__ << " deleting " << num << " objects" << dendl;
- Context *fin = new C_DeleteMore(this, get_osdmap_epoch());
- t->register_on_commit(fin);
- } else {
- dout(20) << __func__ << " finished" << dendl;
- if (cct->_conf->osd_inject_failure_on_pg_removal) {
- _exit(1);
- }
-
- // final flush here to ensure completions drop refs. Of particular concern
- // are the SnapMapper ContainerContexts.
- {
- PGRef pgref(this);
- PGLog::clear_info_log(info.pgid, t);
- t->remove_collection(coll);
- t->register_on_commit(new ContainerContext<PGRef>(pgref));
- t->register_on_applied(new ContainerContext<PGRef>(pgref));
- osd->store->queue_transaction(ch, std::move(*t));
- }
- ch->flush();
-
- if (!osd->try_finish_pg_delete(this, pool.info.get_pg_num())) {
- dout(1) << __func__ << " raced with merge, reinstantiating" << dendl;
- ch = osd->store->create_new_collection(coll);
- _create(*t,
- info.pgid,
- info.pgid.get_split_bits(pool.info.get_pg_num()));
- _init(*t, info.pgid, &pool.info);
- last_epoch = 0; // to ensure pg epoch is also written
- dirty_info = true;
- dirty_big_info = true;
- } else {
- deleted = true;
-
- // cancel reserver here, since the PG is about to get deleted and the
- // exit() methods don't run when that happens.
- osd->local_reserver.cancel_reservation(info.pgid);
-
- osd->logger->dec(l_osd_pg_removing);
- }
- }
-}
-
-// Compute pending backfill data
-static int64_t pending_backfill(CephContext *cct, int64_t bf_bytes, int64_t local_bytes)
-{
- lgeneric_dout(cct, 20) << __func__ << " Adjust local usage " << (local_bytes >> 10) << "KiB"
- << " primary usage " << (bf_bytes >> 10) << "KiB" << dendl;
- return std::max((int64_t)0, bf_bytes - local_bytes);
-}
-
-int PG::pg_stat_adjust(osd_stat_t *ns)
-{
- osd_stat_t &new_stat = *ns;
- if (is_primary()) {
- return 0;
- }
- // Adjust the kb_used by adding pending backfill data
- uint64_t reserved_num_bytes = get_reserved_num_bytes();
-
- // For now we don't consider projected space gains here
- // I suggest we have an optional 2 pass backfill that frees up
- // space in a first pass. This could be triggered when at nearfull
- // or near to backfillfull.
- if (reserved_num_bytes > 0) {
- // TODO: Handle compression by adjusting by the PGs average
- // compression precentage.
- dout(20) << __func__ << " reserved_num_bytes " << (reserved_num_bytes >> 10) << "KiB"
- << " Before kb_used " << new_stat.statfs.kb_used() << "KiB" << dendl;
- if (new_stat.statfs.available > reserved_num_bytes)
- new_stat.statfs.available -= reserved_num_bytes;
- else
- new_stat.statfs.available = 0;
- dout(20) << __func__ << " After kb_used " << new_stat.statfs.kb_used() << "KiB" << dendl;
- return 1;
- }
- return 0;
-}
-