if (limit != eversion_t() &&
limit != pg_trim_to &&
pg_log.get_log().approx_size() > target) {
- size_t num_to_trim = pg_log.get_log().approx_size() - target;
- if (num_to_trim < cct->_conf->osd_pg_log_trim_min) {
+ size_t num_to_trim = MIN(pg_log.get_log().approx_size() - target,
+ cct->_conf->osd_pg_log_trim_max);
+ if (num_to_trim < cct->_conf->osd_pg_log_trim_min &&
+ cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
return;
}
list<pg_log_entry_t>::const_iterator it = pg_log.get_log().log.begin();
return;
}
- if (write_ordered &&
- scrubber.write_blocked_by_scrub(head)) {
+ if (write_ordered && scrubber.is_chunky_scrub_active() &&
+ write_blocked_by_scrub(head)) {
dout(20) << __func__ << ": waiting for scrub" << dendl;
waiting_for_scrub.push_back(op);
op->mark_delayed("waiting for scrub");
in_progress_proxy_ops.erase(p);
}
-void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop)
+void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop,
+ vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << " " << prdop->soid << dendl;
prdop->canceled = true;
// cancel objecter op, if we can
if (prdop->objecter_tid) {
- osd->objecter->op_cancel(prdop->objecter_tid, -ECANCELED);
+ tids->push_back(prdop->objecter_tid);
for (uint32_t i = 0; i < prdop->ops.size(); i++) {
prdop->ops[i].outdata.clear();
}
}
}
-void PrimaryLogPG::cancel_proxy_ops(bool requeue)
+void PrimaryLogPG::cancel_proxy_ops(bool requeue, vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << dendl;
// cancel proxy reads
map<ceph_tid_t, ProxyReadOpRef>::iterator p = proxyread_ops.begin();
while (p != proxyread_ops.end()) {
- cancel_proxy_read((p++)->second);
+ cancel_proxy_read((p++)->second, tids);
}
// cancel proxy writes
map<ceph_tid_t, ProxyWriteOpRef>::iterator q = proxywrite_ops.begin();
while (q != proxywrite_ops.end()) {
- cancel_proxy_write((q++)->second);
+ cancel_proxy_write((q++)->second, tids);
}
if (requeue) {
pwop->ctx = NULL;
}
-void PrimaryLogPG::cancel_proxy_write(ProxyWriteOpRef pwop)
+void PrimaryLogPG::cancel_proxy_write(ProxyWriteOpRef pwop,
+ vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << " " << pwop->soid << dendl;
pwop->canceled = true;
// cancel objecter op, if we can
if (pwop->objecter_tid) {
- osd->objecter->op_cancel(pwop->objecter_tid, -ECANCELED);
+ tids->push_back(pwop->objecter_tid);
delete pwop->ctx;
pwop->ctx = NULL;
proxywrite_ops.erase(pwop->objecter_tid);
{
hobject_t hoid = obc ? obc->obs.oi.soid : missing_oid;
assert(hoid != hobject_t());
- if (scrubber.write_blocked_by_scrub(hoid)) {
+ if (write_blocked_by_scrub(hoid)) {
dout(10) << __func__ << " " << hoid
<< " blocked by scrub" << dendl;
if (op) {
bufferlist::iterator *bl_it)
{
dout(20) << __func__ << dendl;
+ bool skip_data_digest =
+ (osd->store->has_builtin_csum() && g_conf->osd_skip_data_digest) ||
+ g_conf->osd_distrust_data_digest;
auto& op = osd_op.op;
if (op.checksum.chunk_size > 0) {
// If there is a data digest and it is possible we are reading
// entire object, pass the digest.
boost::optional<uint32_t> maybe_crc;
- if (oi.is_data_digest() && op.checksum.offset == 0 &&
+ if (!skip_data_digest &&
+ oi.is_data_digest() && op.checksum.offset == 0 &&
op.checksum.length >= oi.size) {
maybe_crc = oi.data_digest;
}
{
dout(20) << __func__ << dendl;
ceph_osd_op& op = osd_op.op;
+ bool skip_data_digest =
+ (osd->store->has_builtin_csum() && g_conf->osd_skip_data_digest) ||
+ g_conf->osd_distrust_data_digest;
auto& oi = ctx->new_obs.oi;
uint64_t size = oi.size;
// If there is a data digest and it is possible we are reading
// entire object, pass the digest.
boost::optional<uint32_t> maybe_crc;
- if (oi.is_data_digest() && op.checksum.offset == 0 &&
+ if (!skip_data_digest &&
+ oi.is_data_digest() && op.checksum.offset == 0 &&
op.checksum.length >= oi.size) {
maybe_crc = oi.data_digest;
}
__u32 seq = oi.truncate_seq;
uint64_t size = oi.size;
bool trimmed_read = false;
+ bool skip_data_digest =
+ (osd->store->has_builtin_csum() && g_conf->osd_skip_data_digest) ||
+ g_conf->osd_distrust_data_digest;
// are we beyond truncate_size?
if ( (seq < op.extent.truncate_seq) &&
// If there is a data digest and it is possible we are reading
// entire object, pass the digest. FillInVerifyExtent will
// will check the oi.size again.
- if (oi.is_data_digest() && op.extent.offset == 0 &&
+ if (!skip_data_digest &&
+ oi.is_data_digest() && op.extent.offset == 0 &&
op.extent.length >= oi.size)
maybe_crc = oi.data_digest;
ctx->pending_async_reads.push_back(
<< " bytes from obj " << soid << dendl;
// whole object? can we verify the checksum?
- if (op.extent.length == oi.size && oi.is_data_digest()) {
+ if (!skip_data_digest &&
+ op.extent.length == oi.size && oi.is_data_digest()) {
uint32_t crc = osd_op.outdata.crc32c(-1);
if (oi.data_digest != crc) {
osd->clog->error() << info.pgid << std::hex
auto& op = osd_op.op;
auto& oi = ctx->new_obs.oi;
auto& soid = oi.soid;
+ bool skip_data_digest =
+ (osd->store->has_builtin_csum() && g_conf->osd_skip_data_digest) ||
+ g_conf->osd_distrust_data_digest;
if (op.extent.truncate_seq) {
dout(0) << "sparse_read does not support truncation sequence " << dendl;
// Maybe at first, there is no much whole objects. With continued use, more
// and more whole object exist. So from this point, for spare-read add
// checksum make sense.
- if (total_read == oi.size && oi.is_data_digest()) {
+ if (!skip_data_digest &&
+ total_read == oi.size && oi.is_data_digest()) {
uint32_t crc = data_bl.crc32c(-1);
if (oi.data_digest != crc) {
osd->clog->error() << info.pgid << std::hex
ObjectState& obs = ctx->new_obs;
object_info_t& oi = obs.oi;
const hobject_t& soid = oi.soid;
+ bool skip_data_digest =
+ (osd->store->has_builtin_csum() && g_conf->osd_skip_data_digest) ||
+ g_conf->osd_distrust_data_digest;
PGTransaction* t = ctx->op_t.get();
soid, op.extent.offset, op.extent.length, osd_op.indata, op.flags);
}
- if (op.extent.offset == 0 && op.extent.length >= oi.size)
+ if (op.extent.offset == 0 && op.extent.length >= oi.size
+ && !skip_data_digest) {
obs.oi.set_data_digest(osd_op.indata.crc32c(-1));
- else if (op.extent.offset == oi.size && obs.oi.is_data_digest())
- obs.oi.set_data_digest(osd_op.indata.crc32c(obs.oi.data_digest));
- else
+ } else if (op.extent.offset == oi.size && obs.oi.is_data_digest()) {
+ if (skip_data_digest) {
+ obs.oi.clear_data_digest();
+ } else {
+ obs.oi.set_data_digest(osd_op.indata.crc32c(obs.oi.data_digest));
+ }
+ } else {
obs.oi.clear_data_digest();
+ }
write_update_size_and_usage(ctx->delta_stats, oi, ctx->modified_ranges,
op.extent.offset, op.extent.length);
if (op.extent.length) {
t->write(soid, 0, op.extent.length, osd_op.indata, op.flags);
}
- obs.oi.set_data_digest(osd_op.indata.crc32c(-1));
+ if (!skip_data_digest) {
+ obs.oi.set_data_digest(osd_op.indata.crc32c(-1));
+ } else {
+ obs.oi.clear_data_digest();
+ }
write_update_size_and_usage(ctx->delta_stats, oi, ctx->modified_ranges,
0, op.extent.length, true);
int result = 0;
object_copy_cursor_t cursor;
uint64_t out_max;
+ bool skip_data_digest =
+ (osd->store->has_builtin_csum() && g_conf->osd_skip_data_digest) ||
+ g_conf->osd_distrust_data_digest;
+
try {
::decode(cursor, bp);
::decode(out_max, bp);
} else {
reply_obj.snap_seq = obc->ssc->snapset.seq;
}
- if (oi.is_data_digest()) {
+ if (!skip_data_digest && oi.is_data_digest()) {
reply_obj.flags |= object_copy_data_t::FLAG_DATA_DIGEST;
reply_obj.data_digest = oi.data_digest;
}
// FIXME: if the src etc match, we could avoid restarting from the
// beginning.
CopyOpRef cop = copy_ops[dest];
- cancel_copy(cop, false);
+ vector<ceph_tid_t> tids;
+ cancel_copy(cop, false, &tids);
+ osd->objecter->op_cancel(tids, -ECANCELED);
}
CopyOpRef cop(std::make_shared<CopyOp>(cb, obc, src, oloc, version, flags,
void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
{
+ vector<ceph_tid_t> tids;
dout(10) << __func__ << " " << oid << " tid " << tid
<< " " << cpp_strerror(r) << dendl;
map<hobject_t,CopyOpRef>::iterator p = copy_ops.find(oid);
for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
it != proxyread_ops.end();) {
if (it->second->soid == cobc->obs.oi.soid) {
- cancel_proxy_read((it++)->second);
+ cancel_proxy_read((it++)->second, &tids);
} else {
++it;
}
for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
it != proxywrite_ops.end();) {
if (it->second->soid == cobc->obs.oi.soid) {
- cancel_proxy_write((it++)->second);
+ cancel_proxy_write((it++)->second, &tids);
} else {
++it;
}
}
+ osd->objecter->op_cancel(tids, -ECANCELED);
kick_proxy_ops_blocked(cobc->obs.oi.soid);
}
kick_object_context_blocked(cobc);
}
+void PrimaryLogPG::cancel_and_requeue_proxy_ops(hobject_t oid) {
+ vector<ceph_tid_t> tids;
+ for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
+ it != proxyread_ops.end();) {
+ if (it->second->soid == oid) {
+ cancel_proxy_read((it++)->second, &tids);
+ } else {
+ ++it;
+ }
+ }
+ for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
+ it != proxywrite_ops.end();) {
+ if (it->second->soid == oid) {
+ cancel_proxy_write((it++)->second, &tids);
+ } else {
+ ++it;
+ }
+ }
+ osd->objecter->op_cancel(tids, -ECANCELED);
+ kick_proxy_ops_blocked(oid);
+}
+
void PrimaryLogPG::_write_copy_chunk(CopyOpRef cop, PGTransaction *t)
{
dout(20) << __func__ << " " << cop
// CopyFromCallback fills this in for us
obs.oi.user_version = ctx->user_at_version;
- obs.oi.set_data_digest(cb->results->data_digest);
- obs.oi.set_omap_digest(cb->results->omap_digest);
+ if (cb->results->is_data_digest()) {
+ obs.oi.set_data_digest(cb->results->data_digest);
+ } else {
+ obs.oi.clear_data_digest();
+ }
+ if (cb->results->is_omap_digest()) {
+ obs.oi.set_omap_digest(cb->results->omap_digest);
+ } else {
+ obs.oi.clear_omap_digest();
+ }
obs.oi.truncate_seq = cb->results->truncate_seq;
obs.oi.truncate_size = cb->results->truncate_size;
}
tctx->new_obs.oi.size = results->object_size;
tctx->new_obs.oi.user_version = results->user_version;
- // Don't care src object whether have data or omap digest
- if (results->object_size)
+ if (results->is_data_digest()) {
tctx->new_obs.oi.set_data_digest(results->data_digest);
- if (results->has_omap)
+ } else {
+ tctx->new_obs.oi.clear_data_digest();
+ }
+ if (results->is_omap_digest()) {
tctx->new_obs.oi.set_omap_digest(results->omap_digest);
+ } else {
+ tctx->new_obs.oi.clear_omap_digest();
+ }
tctx->new_obs.oi.truncate_seq = results->truncate_seq;
tctx->new_obs.oi.truncate_size = results->truncate_size;
agent_choose_mode();
}
-void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
+void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue,
+ vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << " " << cop->obc->obs.oi.soid
<< " from " << cop->src << " " << cop->oloc
// cancel objecter op, if we can
if (cop->objecter_tid) {
- osd->objecter->op_cancel(cop->objecter_tid, -ECANCELED);
+ tids->push_back(cop->objecter_tid);
cop->objecter_tid = 0;
if (cop->objecter_tid2) {
- osd->objecter->op_cancel(cop->objecter_tid2, -ECANCELED);
+ tids->push_back(cop->objecter_tid2);
cop->objecter_tid2 = 0;
}
}
cop->obc = ObjectContextRef();
}
-void PrimaryLogPG::cancel_copy_ops(bool requeue)
+void PrimaryLogPG::cancel_copy_ops(bool requeue, vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << dendl;
map<hobject_t,CopyOpRef>::iterator p = copy_ops.begin();
while (p != copy_ops.end()) {
// requeue this op? can I queue up all of them?
- cancel_copy((p++)->second, requeue);
+ cancel_copy((p++)->second, requeue, tids);
}
}
osd->reply_op_error(fop->dup_ops.front(), -EBUSY);
fop->dup_ops.pop_front();
}
- cancel_flush(fop, false);
+ vector<ceph_tid_t> tids;
+ cancel_flush(fop, false, &tids);
+ osd->objecter->op_cancel(tids, -ECANCELED);
}
/**
}
if (!fop->blocking &&
- scrubber.write_blocked_by_scrub(oid)) {
+ write_blocked_by_scrub(oid)) {
if (fop->op) {
dout(10) << __func__ << " blocked by scrub" << dendl;
requeue_op(fop->op);
return -EAGAIN; // will retry
} else {
osd->logger->inc(l_osd_tier_try_flush_fail);
- cancel_flush(fop, false);
+ vector<ceph_tid_t> tids;
+ cancel_flush(fop, false, &tids);
+ osd->objecter->op_cancel(tids, -ECANCELED);
return -ECANCELED;
}
}
fop->op)) {
dout(20) << __func__ << " took write lock" << dendl;
} else if (fop->op) {
- dout(10) << __func__ << " waiting on write lock" << dendl;
+ dout(10) << __func__ << " waiting on write lock " << fop->op << " "
+ << fop->dup_ops << dendl;
close_op_ctx(ctx.release());
- requeue_op(fop->op);
- requeue_ops(fop->dup_ops);
+ // fop->op is now waiting on the lock; get fop->dup_ops to wait too.
+ for (auto op : fop->dup_ops) {
+ bool locked = ctx->lock_manager.get_lock_type(
+ ObjectContext::RWState::RWWRITE,
+ oid,
+ obc,
+ op);
+ assert(!locked);
+ }
return -EAGAIN; // will retry
} else {
dout(10) << __func__ << " failed write lock, no op; failing" << dendl;
close_op_ctx(ctx.release());
osd->logger->inc(l_osd_tier_try_flush_fail);
- cancel_flush(fop, false);
+ vector<ceph_tid_t> tids;
+ cancel_flush(fop, false, &tids);
+ osd->objecter->op_cancel(tids, -ECANCELED);
return -ECANCELED;
}
return -EINPROGRESS;
}
-void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue)
+void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue,
+ vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << " " << fop->obc->obs.oi.soid << " tid "
<< fop->objecter_tid << dendl;
if (fop->objecter_tid) {
- osd->objecter->op_cancel(fop->objecter_tid, -ECANCELED);
+ tids->push_back(fop->objecter_tid);
fop->objecter_tid = 0;
}
- if (fop->blocking) {
+ if (fop->io_tids.size()) {
+ for (auto &p : fop->io_tids) {
+ tids->push_back(p.second);
+ p.second = 0;
+ }
+ }
+ if (fop->blocking && fop->obc->is_blocked()) {
fop->obc->stop_block();
kick_object_context_blocked(fop->obc);
}
flush_ops.erase(fop->obc->obs.oi.soid);
}
-void PrimaryLogPG::cancel_flush_ops(bool requeue)
+void PrimaryLogPG::cancel_flush_ops(bool requeue, vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << dendl;
map<hobject_t,FlushOpRef>::iterator p = flush_ops.begin();
while (p != flush_ops.end()) {
- cancel_flush((p++)->second, requeue);
+ cancel_flush((p++)->second, requeue, tids);
}
}
[this, entries, repop, on_complete]() {
ObjectStore::Transaction t;
eversion_t old_last_update = info.last_update;
- merge_new_log_entries(entries, t);
+ merge_new_log_entries(entries, t, pg_trim_to, min_last_complete_ondisk);
set<pg_shard_t> waiting_on;
pg_whoami.shard,
get_osdmap()->get_epoch(),
last_peering_reset,
- repop->rep_tid);
+ repop->rep_tid,
+ pg_trim_to,
+ min_last_complete_ondisk);
osd->send_message_osd_cluster(
peer.osd, m, get_osdmap()->get_epoch());
waiting_on.insert(peer);
int r = osd->store->queue_transaction(osr.get(), std::move(t), NULL);
assert(r == 0);
});
+
+ calc_trim_to();
}
void PrimaryLogPG::cancel_log_updates()
return;
}
- if (scrubber.write_blocked_by_scrub(obc->obs.oi.soid)) {
+ if (write_blocked_by_scrub(obc->obs.oi.soid)) {
dout(10) << "handle_watch_timeout waiting for scrub on obj "
<< obc->obs.oi.soid
<< dendl;
} else {
auto p = obc->ssc->snapset.clone_snaps.find(soid.snap);
assert(p != obc->ssc->snapset.clone_snaps.end());
+ if (p->second.empty()) {
+ dout(1) << __func__ << " " << soid << " empty snapset -- DNE" << dendl;
+ assert(!cct->_conf->osd_debug_verify_snaps);
+ return -ENOENT;
+ }
first = p->second.back();
last = p->second.front();
}
op->get_req());
assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
ObjectStore::Transaction t;
- append_log_entries_update_missing(m->entries, t);
+ boost::optional<eversion_t> op_trim_to, op_roll_forward_to;
+ if (m->pg_trim_to != eversion_t())
+ op_trim_to = m->pg_trim_to;
+ if (m->pg_roll_forward_to != eversion_t())
+ op_roll_forward_to = m->pg_roll_forward_to;
+
+ dout(20) << __func__ << " op_trim_to = " << op_trim_to << " op_roll_forward_to = " << op_roll_forward_to << dendl;
+
+ append_log_entries_update_missing(m->entries, t, op_trim_to, op_roll_forward_to);
+ eversion_t new_lcod = info.last_complete;
Context *complete = new FunctionContext(
[=](int) {
op->get_req());
lock();
if (!pg_has_reset_since(msg->get_epoch())) {
+ update_last_complete_ondisk(new_lcod);
MOSDPGUpdateLogMissingReply *reply =
new MOSDPGUpdateLogMissingReply(
spg_t(info.pgid.pgid, primary_shard().shard),
pg_whoami.shard,
msg->get_epoch(),
msg->min_epoch,
- msg->get_tid());
+ msg->get_tid(),
+ new_lcod);
reply->set_priority(CEPH_MSG_PRIO_HIGH);
msg->get_connection()->send_message(reply);
}
if (it != log_entry_update_waiting_on.end()) {
if (it->second.waiting_on.count(m->get_from())) {
it->second.waiting_on.erase(m->get_from());
+ if (m->last_complete_ondisk != eversion_t()) {
+ update_peer_last_complete_ondisk(m->get_from(), m->last_complete_ondisk);
+ }
} else {
osd->clog->error()
<< info.pgid << " got reply "
scrub_clear_state();
unreg_next_scrub();
- cancel_copy_ops(false);
- cancel_flush_ops(false);
- cancel_proxy_ops(false);
+
+ vector<ceph_tid_t> tids;
+ cancel_copy_ops(false, &tids);
+ cancel_flush_ops(false, &tids);
+ cancel_proxy_ops(false, &tids);
+ osd->objecter->op_cancel(tids, -ECANCELED);
+
apply_and_flush_repops(false);
cancel_log_updates();
// we must remove PGRefs, so do this this prior to release_backoffs() callers
clear_scrub_reserved();
- cancel_copy_ops(is_primary());
- cancel_flush_ops(is_primary());
- cancel_proxy_ops(is_primary());
+ vector<ceph_tid_t> tids;
+ cancel_copy_ops(is_primary(), &tids);
+ cancel_flush_ops(is_primary(), &tids);
+ cancel_proxy_ops(is_primary(), &tids);
+ osd->objecter->op_cancel(tids, -ECANCELED);
// requeue object waiters
for (auto& p : waiting_for_unreadable_object) {
if (bi->version < info.log_tail) {
dout(10) << __func__<< ": bi is old, rescanning local backfill_info"
<< dendl;
+ osr->flush();
if (last_update_applied >= info.log_tail) {
bi->version = last_update_applied;
} else {
- osr->flush();
bi->version = info.last_update;
}
scan_range(local_min, local_max, bi, handle);
// Once we hit a degraded object just skip
if (is_degraded_or_backfilling_object(aoid))
return;
- if (scrubber.write_blocked_by_scrub(aoid))
+ if (write_blocked_by_scrub(aoid))
return;
}
// Once we hit a degraded object just skip further trim
if (is_degraded_or_backfilling_object(aoid))
return;
- if (scrubber.write_blocked_by_scrub(aoid))
+ if (write_blocked_by_scrub(aoid))
return;
}
new_hset.using_gmt);
// If the current object is degraded we skip this persist request
- if (scrubber.write_blocked_by_scrub(oid))
+ if (write_blocked_by_scrub(oid))
return;
hit_set->seal();
osd->logger->inc(l_osd_agent_skip);
continue;
}
- if (scrubber.write_blocked_by_scrub(obc->obs.oi.soid)) {
+ if (range_intersects_scrub(obc->obs.oi.soid,
+ obc->obs.oi.soid.get_head())) {
dout(20) << __func__ << " skip (scrubbing) " << obc->obs.oi << dendl;
osd->logger->inc(l_osd_agent_skip);
continue;
*/
void PrimaryLogPG::scrub_snapshot_metadata(
ScrubMap &scrubmap,
- const map<hobject_t, pair<uint32_t, uint32_t>> &missing_digest)
+ const map<hobject_t,
+ pair<boost::optional<uint32_t>,
+ boost::optional<uint32_t>>> &missing_digest)
{
dout(10) << __func__ << dendl;
vector<snapid_t>::reverse_iterator curclone; // Defined only if snapset initialized
unsigned missing = 0;
inconsistent_snapset_wrapper soid_error, head_error;
+ unsigned soid_error_count = 0;
bufferlist last_data;
osd->clog->error() << mode << " " << info.pgid << " " << soid
<< " no '" << OI_ATTR << "' attr";
++scrubber.shallow_errors;
- soid_error.set_oi_attr_missing();
+ soid_error.set_info_missing();
} else {
bufferlist bv;
bv.push_back(p->second.attrs[OI_ATTR]);
osd->clog->error() << mode << " " << info.pgid << " " << soid
<< " can't decode '" << OI_ATTR << "' attr " << e.what();
++scrubber.shallow_errors;
- soid_error.set_oi_attr_corrupted();
- soid_error.set_oi_attr_missing(); // Not available too
+ soid_error.set_info_corrupted();
+ soid_error.set_info_missing(); // Not available too
}
}
++scrubber.shallow_errors;
soid_error.set_headless();
scrubber.store->add_snap_error(pool.id, soid_error);
+ ++soid_error_count;
if (head && soid.get_head() == head->get_head())
head_error.set_clone(soid.snap);
continue;
}
// Save previous head error information
- if (head && head_error.errors)
+ if (head && (head_error.errors || soid_error_count))
scrubber.store->add_snap_error(pool.id, head_error);
// Set this as a new head object
head = soid;
missing = 0;
head_error = soid_error;
+ soid_error_count = 0;
dout(20) << __func__ << " " << mode << " new head " << head << dendl;
<< " no '" << SS_ATTR << "' attr";
++scrubber.shallow_errors;
snapset = boost::none;
- head_error.set_ss_attr_missing();
+ head_error.set_snapset_missing();
} else {
bufferlist bl;
bl.push_back(p->second.attrs[SS_ATTR]);
try {
snapset = SnapSet(); // Initialize optional<> before decoding into it
::decode(snapset.get(), blp);
+ head_error.ss_bl.push_back(p->second.attrs[SS_ATTR]);
} catch (buffer::error& e) {
snapset = boost::none;
osd->clog->error() << mode << " " << info.pgid << " " << soid
<< " can't decode '" << SS_ATTR << "' attr " << e.what();
++scrubber.shallow_errors;
- head_error.set_ss_attr_corrupted();
+ head_error.set_snapset_corrupted();
}
}
osd->clog->error() << mode << " " << info.pgid << " " << soid
<< " snaps.seq not set";
++scrubber.shallow_errors;
- head_error.set_snapset_mismatch();
+ head_error.set_snapset_error();
}
}
// what's next?
++curclone;
- if (soid_error.errors)
+ if (soid_error.errors) {
scrubber.store->add_snap_error(pool.id, soid_error);
+ ++soid_error_count;
+ }
}
scrub_cstat.add(stat);
log_missing(missing, head, osd->clog, info.pgid, __func__,
mode, pool.info.allow_incomplete_clones());
}
- if (head && head_error.errors)
+ if (head && (head_error.errors || soid_error_count))
scrubber.store->add_snap_error(pool.id, head_error);
- for (map<hobject_t,pair<uint32_t,uint32_t>>::const_iterator p =
- missing_digest.begin();
- p != missing_digest.end();
- ++p) {
+ for (auto p = missing_digest.begin(); p != missing_digest.end(); ++p) {
if (p->first.is_snapdir())
continue;
dout(10) << __func__ << " recording digests for " << p->first << dendl;
OpContextUPtr ctx = simple_opc_create(obc);
ctx->at_version = get_next_version();
ctx->mtime = utime_t(); // do not update mtime
- ctx->new_obs.oi.set_data_digest(p->second.first);
- ctx->new_obs.oi.set_omap_digest(p->second.second);
+ if (p->second.first) {
+ ctx->new_obs.oi.set_data_digest(*p->second.first);
+ } else {
+ ctx->new_obs.oi.clear_data_digest();
+ }
+ if (p->second.second) {
+ ctx->new_obs.oi.set_omap_digest(*p->second.second);
+ } else {
+ ctx->new_obs.oi.clear_omap_digest();
+ }
finish_ctx(ctx.get(), pg_log_entry_t::MODIFY);
ctx->register_on_success(