if (limit != eversion_t() &&
limit != pg_trim_to &&
pg_log.get_log().approx_size() > target) {
- size_t num_to_trim = pg_log.get_log().approx_size() - target;
- if (num_to_trim < cct->_conf->osd_pg_log_trim_min) {
+ size_t num_to_trim = MIN(pg_log.get_log().approx_size() - target,
+ cct->_conf->osd_pg_log_trim_max);
+ if (num_to_trim < cct->_conf->osd_pg_log_trim_min &&
+ cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
return;
}
list<pg_log_entry_t>::const_iterator it = pg_log.get_log().log.begin();
in_progress_proxy_ops.erase(p);
}
-void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop)
+void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop,
+ vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << " " << prdop->soid << dendl;
prdop->canceled = true;
// cancel objecter op, if we can
if (prdop->objecter_tid) {
- osd->objecter->op_cancel(prdop->objecter_tid, -ECANCELED);
+ tids->push_back(prdop->objecter_tid);
for (uint32_t i = 0; i < prdop->ops.size(); i++) {
prdop->ops[i].outdata.clear();
}
}
}
-void PrimaryLogPG::cancel_proxy_ops(bool requeue)
+void PrimaryLogPG::cancel_proxy_ops(bool requeue, vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << dendl;
// cancel proxy reads
map<ceph_tid_t, ProxyReadOpRef>::iterator p = proxyread_ops.begin();
while (p != proxyread_ops.end()) {
- cancel_proxy_read((p++)->second);
+ cancel_proxy_read((p++)->second, tids);
}
// cancel proxy writes
map<ceph_tid_t, ProxyWriteOpRef>::iterator q = proxywrite_ops.begin();
while (q != proxywrite_ops.end()) {
- cancel_proxy_write((q++)->second);
+ cancel_proxy_write((q++)->second, tids);
}
if (requeue) {
pwop->ctx = NULL;
}
-void PrimaryLogPG::cancel_proxy_write(ProxyWriteOpRef pwop)
+void PrimaryLogPG::cancel_proxy_write(ProxyWriteOpRef pwop,
+ vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << " " << pwop->soid << dendl;
pwop->canceled = true;
// cancel objecter op, if we can
if (pwop->objecter_tid) {
- osd->objecter->op_cancel(pwop->objecter_tid, -ECANCELED);
+ tids->push_back(pwop->objecter_tid);
delete pwop->ctx;
pwop->ctx = NULL;
proxywrite_ops.erase(pwop->objecter_tid);
// FIXME: if the src etc match, we could avoid restarting from the
// beginning.
CopyOpRef cop = copy_ops[dest];
- cancel_copy(cop, false);
+ vector<ceph_tid_t> tids;
+ cancel_copy(cop, false, &tids);
+ osd->objecter->op_cancel(tids, -ECANCELED);
}
CopyOpRef cop(std::make_shared<CopyOp>(cb, obc, src, oloc, version, flags,
void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
{
+ vector<ceph_tid_t> tids;
dout(10) << __func__ << " " << oid << " tid " << tid
<< " " << cpp_strerror(r) << dendl;
map<hobject_t,CopyOpRef>::iterator p = copy_ops.find(oid);
for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
it != proxyread_ops.end();) {
if (it->second->soid == cobc->obs.oi.soid) {
- cancel_proxy_read((it++)->second);
+ cancel_proxy_read((it++)->second, &tids);
} else {
++it;
}
for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
it != proxywrite_ops.end();) {
if (it->second->soid == cobc->obs.oi.soid) {
- cancel_proxy_write((it++)->second);
+ cancel_proxy_write((it++)->second, &tids);
} else {
++it;
}
}
+ osd->objecter->op_cancel(tids, -ECANCELED);
kick_proxy_ops_blocked(cobc->obs.oi.soid);
}
kick_object_context_blocked(cobc);
}
+void PrimaryLogPG::cancel_and_requeue_proxy_ops(hobject_t oid) {
+ vector<ceph_tid_t> tids;
+ for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
+ it != proxyread_ops.end();) {
+ if (it->second->soid == oid) {
+ cancel_proxy_read((it++)->second, &tids);
+ } else {
+ ++it;
+ }
+ }
+ for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
+ it != proxywrite_ops.end();) {
+ if (it->second->soid == oid) {
+ cancel_proxy_write((it++)->second, &tids);
+ } else {
+ ++it;
+ }
+ }
+ osd->objecter->op_cancel(tids, -ECANCELED);
+ kick_proxy_ops_blocked(oid);
+}
+
void PrimaryLogPG::_write_copy_chunk(CopyOpRef cop, PGTransaction *t)
{
dout(20) << __func__ << " " << cop
agent_choose_mode();
}
-void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
+void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue,
+ vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << " " << cop->obc->obs.oi.soid
<< " from " << cop->src << " " << cop->oloc
// cancel objecter op, if we can
if (cop->objecter_tid) {
- osd->objecter->op_cancel(cop->objecter_tid, -ECANCELED);
+ tids->push_back(cop->objecter_tid);
cop->objecter_tid = 0;
if (cop->objecter_tid2) {
- osd->objecter->op_cancel(cop->objecter_tid2, -ECANCELED);
+ tids->push_back(cop->objecter_tid2);
cop->objecter_tid2 = 0;
}
}
cop->obc = ObjectContextRef();
}
-void PrimaryLogPG::cancel_copy_ops(bool requeue)
+void PrimaryLogPG::cancel_copy_ops(bool requeue, vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << dendl;
map<hobject_t,CopyOpRef>::iterator p = copy_ops.begin();
while (p != copy_ops.end()) {
// requeue this op? can I queue up all of them?
- cancel_copy((p++)->second, requeue);
+ cancel_copy((p++)->second, requeue, tids);
}
}
osd->reply_op_error(fop->dup_ops.front(), -EBUSY);
fop->dup_ops.pop_front();
}
- cancel_flush(fop, false);
+ vector<ceph_tid_t> tids;
+ cancel_flush(fop, false, &tids);
+ osd->objecter->op_cancel(tids, -ECANCELED);
}
/**
return -EAGAIN; // will retry
} else {
osd->logger->inc(l_osd_tier_try_flush_fail);
- cancel_flush(fop, false);
+ vector<ceph_tid_t> tids;
+ cancel_flush(fop, false, &tids);
+ osd->objecter->op_cancel(tids, -ECANCELED);
return -ECANCELED;
}
}
dout(10) << __func__ << " failed write lock, no op; failing" << dendl;
close_op_ctx(ctx.release());
osd->logger->inc(l_osd_tier_try_flush_fail);
- cancel_flush(fop, false);
+ vector<ceph_tid_t> tids;
+ cancel_flush(fop, false, &tids);
+ osd->objecter->op_cancel(tids, -ECANCELED);
return -ECANCELED;
}
return -EINPROGRESS;
}
-void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue)
+void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue,
+ vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << " " << fop->obc->obs.oi.soid << " tid "
<< fop->objecter_tid << dendl;
if (fop->objecter_tid) {
- osd->objecter->op_cancel(fop->objecter_tid, -ECANCELED);
+ tids->push_back(fop->objecter_tid);
fop->objecter_tid = 0;
}
- if (fop->blocking) {
+ if (fop->io_tids.size()) {
+ for (auto &p : fop->io_tids) {
+ tids->push_back(p.second);
+ p.second = 0;
+ }
+ }
+ if (fop->blocking && fop->obc->is_blocked()) {
fop->obc->stop_block();
kick_object_context_blocked(fop->obc);
}
flush_ops.erase(fop->obc->obs.oi.soid);
}
-void PrimaryLogPG::cancel_flush_ops(bool requeue)
+void PrimaryLogPG::cancel_flush_ops(bool requeue, vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << dendl;
map<hobject_t,FlushOpRef>::iterator p = flush_ops.begin();
while (p != flush_ops.end()) {
- cancel_flush((p++)->second, requeue);
+ cancel_flush((p++)->second, requeue, tids);
}
}
[this, entries, repop, on_complete]() {
ObjectStore::Transaction t;
eversion_t old_last_update = info.last_update;
- merge_new_log_entries(entries, t);
+ merge_new_log_entries(entries, t, pg_trim_to, min_last_complete_ondisk);
set<pg_shard_t> waiting_on;
pg_whoami.shard,
get_osdmap()->get_epoch(),
last_peering_reset,
- repop->rep_tid);
+ repop->rep_tid,
+ pg_trim_to,
+ min_last_complete_ondisk);
osd->send_message_osd_cluster(
peer.osd, m, get_osdmap()->get_epoch());
waiting_on.insert(peer);
int r = osd->store->queue_transaction(osr.get(), std::move(t), NULL);
assert(r == 0);
});
+
+ calc_trim_to();
}
void PrimaryLogPG::cancel_log_updates()
op->get_req());
assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
ObjectStore::Transaction t;
- append_log_entries_update_missing(m->entries, t);
+ boost::optional<eversion_t> op_trim_to, op_roll_forward_to;
+ if (m->pg_trim_to != eversion_t())
+ op_trim_to = m->pg_trim_to;
+ if (m->pg_roll_forward_to != eversion_t())
+ op_roll_forward_to = m->pg_roll_forward_to;
+
+ dout(20) << __func__ << " op_trim_to = " << op_trim_to << " op_roll_forward_to = " << op_roll_forward_to << dendl;
+
+ append_log_entries_update_missing(m->entries, t, op_trim_to, op_roll_forward_to);
+ eversion_t new_lcod = info.last_complete;
Context *complete = new FunctionContext(
[=](int) {
op->get_req());
lock();
if (!pg_has_reset_since(msg->get_epoch())) {
+ update_last_complete_ondisk(new_lcod);
MOSDPGUpdateLogMissingReply *reply =
new MOSDPGUpdateLogMissingReply(
spg_t(info.pgid.pgid, primary_shard().shard),
pg_whoami.shard,
msg->get_epoch(),
msg->min_epoch,
- msg->get_tid());
+ msg->get_tid(),
+ new_lcod);
reply->set_priority(CEPH_MSG_PRIO_HIGH);
msg->get_connection()->send_message(reply);
}
if (it != log_entry_update_waiting_on.end()) {
if (it->second.waiting_on.count(m->get_from())) {
it->second.waiting_on.erase(m->get_from());
+ if (m->last_complete_ondisk != eversion_t()) {
+ update_peer_last_complete_ondisk(m->get_from(), m->last_complete_ondisk);
+ }
} else {
osd->clog->error()
<< info.pgid << " got reply "
scrub_clear_state();
unreg_next_scrub();
- cancel_copy_ops(false);
- cancel_flush_ops(false);
- cancel_proxy_ops(false);
+
+ vector<ceph_tid_t> tids;
+ cancel_copy_ops(false, &tids);
+ cancel_flush_ops(false, &tids);
+ cancel_proxy_ops(false, &tids);
+ osd->objecter->op_cancel(tids, -ECANCELED);
+
apply_and_flush_repops(false);
cancel_log_updates();
// we must remove PGRefs, so do this this prior to release_backoffs() callers
clear_scrub_reserved();
- cancel_copy_ops(is_primary());
- cancel_flush_ops(is_primary());
- cancel_proxy_ops(is_primary());
+ vector<ceph_tid_t> tids;
+ cancel_copy_ops(is_primary(), &tids);
+ cancel_flush_ops(is_primary(), &tids);
+ cancel_proxy_ops(is_primary(), &tids);
+ osd->objecter->op_cancel(tids, -ECANCELED);
// requeue object waiters
for (auto& p : waiting_for_unreadable_object) {
vector<snapid_t>::reverse_iterator curclone; // Defined only if snapset initialized
unsigned missing = 0;
inconsistent_snapset_wrapper soid_error, head_error;
+ unsigned soid_error_count = 0;
bufferlist last_data;
osd->clog->error() << mode << " " << info.pgid << " " << soid
<< " no '" << OI_ATTR << "' attr";
++scrubber.shallow_errors;
- soid_error.set_oi_attr_missing();
+ soid_error.set_info_missing();
} else {
bufferlist bv;
bv.push_back(p->second.attrs[OI_ATTR]);
osd->clog->error() << mode << " " << info.pgid << " " << soid
<< " can't decode '" << OI_ATTR << "' attr " << e.what();
++scrubber.shallow_errors;
- soid_error.set_oi_attr_corrupted();
- soid_error.set_oi_attr_missing(); // Not available too
+ soid_error.set_info_corrupted();
+ soid_error.set_info_missing(); // Not available too
}
}
++scrubber.shallow_errors;
soid_error.set_headless();
scrubber.store->add_snap_error(pool.id, soid_error);
+ ++soid_error_count;
if (head && soid.get_head() == head->get_head())
head_error.set_clone(soid.snap);
continue;
}
// Save previous head error information
- if (head && head_error.errors)
+ if (head && (head_error.errors || soid_error_count))
scrubber.store->add_snap_error(pool.id, head_error);
// Set this as a new head object
head = soid;
missing = 0;
head_error = soid_error;
+ soid_error_count = 0;
dout(20) << __func__ << " " << mode << " new head " << head << dendl;
<< " no '" << SS_ATTR << "' attr";
++scrubber.shallow_errors;
snapset = boost::none;
- head_error.set_ss_attr_missing();
+ head_error.set_snapset_missing();
} else {
bufferlist bl;
bl.push_back(p->second.attrs[SS_ATTR]);
try {
snapset = SnapSet(); // Initialize optional<> before decoding into it
::decode(snapset.get(), blp);
+ head_error.ss_bl.push_back(p->second.attrs[SS_ATTR]);
} catch (buffer::error& e) {
snapset = boost::none;
osd->clog->error() << mode << " " << info.pgid << " " << soid
<< " can't decode '" << SS_ATTR << "' attr " << e.what();
++scrubber.shallow_errors;
- head_error.set_ss_attr_corrupted();
+ head_error.set_snapset_corrupted();
}
}
osd->clog->error() << mode << " " << info.pgid << " " << soid
<< " snaps.seq not set";
++scrubber.shallow_errors;
- head_error.set_snapset_mismatch();
+ head_error.set_snapset_error();
}
}
// what's next?
++curclone;
- if (soid_error.errors)
+ if (soid_error.errors) {
scrubber.store->add_snap_error(pool.id, soid_error);
+ ++soid_error_count;
+ }
}
scrub_cstat.add(stat);
log_missing(missing, head, osd->clog, info.pgid, __func__,
mode, pool.info.allow_incomplete_clones());
}
- if (head && head_error.errors)
+ if (head && (head_error.errors || soid_error_count))
scrubber.store->add_snap_error(pool.id, head_error);
for (map<hobject_t,pair<uint32_t,uint32_t>>::const_iterator p =