PrimaryLogPG *pg,
PrimaryLogPG::OpContext *ctx) : pg(pg), opcontext(ctx) {}
void finish(int r) override {
- if (r < 0)
- opcontext->async_read_result = r;
opcontext->finish_read(pg);
}
~OnReadComplete() override {}
assert(pg->in_progress_async_reads.size());
assert(pg->in_progress_async_reads.front().second == this);
pg->in_progress_async_reads.pop_front();
- pg->complete_read_ctx(async_read_result, this);
+
+ // Restart the op context now that all reads have been
+ // completed. Read failures will be handled by the op finisher
+ pg->execute_ctx(this);
}
}
-class CopyFromCallback: public PrimaryLogPG::CopyCallback {
+class CopyFromCallback : public PrimaryLogPG::CopyCallback {
public:
- PrimaryLogPG::CopyResults *results;
- int retval;
+ PrimaryLogPG::CopyResults *results = nullptr;
PrimaryLogPG::OpContext *ctx;
- explicit CopyFromCallback(PrimaryLogPG::OpContext *ctx_)
- : results(NULL),
- retval(0),
- ctx(ctx_) {}
+ OSDOp &osd_op;
+
+ CopyFromCallback(PrimaryLogPG::OpContext *ctx, OSDOp &osd_op)
+ : ctx(ctx), osd_op(osd_op) {
+ }
~CopyFromCallback() override {}
void finish(PrimaryLogPG::CopyCallbackResults results_) override {
results = results_.get<1>();
int r = results_.get<0>();
- retval = r;
// for finish_copyfrom
ctx->user_at_version = results->user_version;
if (r >= 0) {
ctx->pg->execute_ctx(ctx);
- }
- ctx->copy_cb = NULL;
- if (r < 0) {
+ } else {
if (r != -ECANCELED) { // on cancel just toss it out; client resends
if (ctx->op)
ctx->pg->osd->reply_op_error(ctx->op, r);
uint64_t get_data_size() {
return results->object_size;
}
- int get_result() {
- return retval;
+};
+
+struct CopyFromFinisher : public PrimaryLogPG::OpFinisher {
+ CopyFromCallback *copy_from_callback;
+
+ CopyFromFinisher(CopyFromCallback *copy_from_callback)
+ : copy_from_callback(copy_from_callback) {
+ }
+
+ int execute() override {
+ // instance will be destructed after this method completes
+ copy_from_callback->ctx->pg->finish_copyfrom(copy_from_callback);
+ return 0;
}
};
const hobject_t &hoid,
const ObjectRecoveryInfo &_recovery_info,
ObjectContextRef obc,
+ bool is_delete,
ObjectStore::Transaction *t
)
{
ObjectRecoveryInfo recovery_info(_recovery_info);
clear_object_snap_mapping(t, hoid);
- if (recovery_info.soid.is_snap()) {
+ if (!is_delete && recovery_info.soid.is_snap()) {
OSDriver::OSTransaction _t(osdriver.get_transaction(t));
set<snapid_t> snaps;
dout(20) << " snapset " << recovery_info.ss
snaps,
&_t);
}
- if (pg_log.get_missing().is_missing(recovery_info.soid) &&
+ if (!is_delete && pg_log.get_missing().is_missing(recovery_info.soid) &&
pg_log.get_missing().get_items().find(recovery_info.soid)->second.need > recovery_info.version) {
assert(is_primary());
const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
recover_got(recovery_info.soid, recovery_info.version);
if (is_primary()) {
- assert(obc);
- obc->obs.exists = true;
- obc->ondisk_write_lock();
-
- bool got = obc->get_recovery_read();
- assert(got);
+ if (!is_delete) {
+ obc->obs.exists = true;
+ obc->ondisk_write_lock();
- assert(recovering.count(obc->obs.oi.soid));
- recovering[obc->obs.oi.soid] = obc;
- obc->obs.oi = recovery_info.oi; // may have been updated above
+ bool got = obc->get_recovery_read();
+ assert(got);
+ assert(recovering.count(obc->obs.oi.soid));
+ recovering[obc->obs.oi.soid] = obc;
+ obc->obs.oi = recovery_info.oi; // may have been updated above
+ t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
+ }
t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
- t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
publish_stats_to_osd();
assert(missing_loc.needs_recovery(hoid));
- missing_loc.add_location(hoid, pg_whoami);
+ if (!is_delete)
+ missing_loc.add_location(hoid, pg_whoami);
release_backoffs(hoid);
if (!is_unreadable_object(hoid)) {
auto unreadable_object_entry = waiting_for_unreadable_object.find(hoid);
waiting_for_unreadable_object.erase(unreadable_object_entry);
}
}
- if (pg_log.get_missing().get_items().size() == 0) {
- requeue_ops(waiting_for_all_missing);
- waiting_for_all_missing.clear();
- }
} else {
t->register_on_applied(
new C_OSD_AppliedRecoveredObjectReplica(this));
void PrimaryLogPG::on_global_recover(
const hobject_t &soid,
- const object_stat_sum_t &stat_diff)
+ const object_stat_sum_t &stat_diff,
+ bool is_delete)
{
info.stats.stats.sum.add(stat_diff);
missing_loc.recovered(soid);
map<hobject_t, ObjectContextRef>::iterator i = recovering.find(soid);
assert(i != recovering.end());
- // recover missing won't have had an obc, but it gets filled in
- // during on_local_recover
- assert(i->second);
- list<OpRequestRef> requeue_list;
- i->second->drop_recovery_read(&requeue_list);
- requeue_ops(requeue_list);
+ if (!is_delete) {
+ // recover missing won't have had an obc, but it gets filled in
+ // during on_local_recover
+ assert(i->second);
+ list<OpRequestRef> requeue_list;
+ i->second->drop_recovery_read(&requeue_list);
+ requeue_ops(requeue_list);
+ }
backfills_in_flight.erase(soid);
osd->send_message_osd_cluster(m, con);
}
+void PrimaryLogPG::on_primary_error(
+ const hobject_t &oid,
+ eversion_t v)
+{
+ dout(0) << __func__ << ": oid " << oid << " version " << v << dendl;
+ primary_failed(oid);
+ primary_error(oid, v);
+ backfill_add_missing(oid, v);
+}
+
+void PrimaryLogPG::backfill_add_missing(
+ const hobject_t &oid,
+ eversion_t v)
+{
+ dout(0) << __func__ << ": oid " << oid << " version " << v << dendl;
+ backfills_in_flight.erase(oid);
+ missing_loc.add_missing(oid, v, eversion_t());
+}
+
ConnectionRef PrimaryLogPG::get_con_osd_cluster(
int peer, epoch_t from_epoch)
{
PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
if (is_missing_object(soid)) {
recover_missing(soid, v, cct->_conf->osd_client_op_priority, h);
+ } else if (missing_loc.is_deleted(soid)) {
+ prep_object_replica_deletes(soid, v, h);
} else {
prep_object_replica_pushes(soid, v, h);
}
op->mark_delayed("waiting for missing object");
}
-void PrimaryLogPG::wait_for_all_missing(OpRequestRef op)
-{
- waiting_for_all_missing.push_back(op);
- op->mark_delayed("waiting for all missing");
-}
-
bool PrimaryLogPG::is_degraded_or_backfilling_object(const hobject_t& soid)
{
/* The conditions below may clear (on_local_recover, before we queue
op->mark_delayed("waiting for cache not full");
}
+void PrimaryLogPG::block_for_clean(
+ const hobject_t& oid, OpRequestRef op)
+{
+ dout(20) << __func__ << ": blocking object " << oid
+ << " on primary repair" << dendl;
+ waiting_for_clean_to_primary_repair.push_back(op);
+ op->mark_delayed("waiting for clean to repair");
+}
+
void PrimaryLogPG::block_write_on_snap_rollback(
const hobject_t& oid, ObjectContextRef obc, OpRequestRef op)
{
void PrimaryLogPG::maybe_force_recovery()
{
- // no force if not in degraded/recovery/backfill stats
+ // no force if not in degraded/recovery/backfill states
if (!is_degraded() &&
!state_test(PG_STATE_RECOVERING |
PG_STATE_RECOVERY_WAIT |
- PG_STATE_BACKFILL |
+ PG_STATE_BACKFILLING |
PG_STATE_BACKFILL_WAIT |
PG_STATE_BACKFILL_TOOFULL))
return;
ConnectionRef con,
ceph_tid_t tid)
{
- const pg_missing_t &missing = pg_log.get_missing();
+ const auto &missing = pg_log.get_missing();
string prefix;
string format;
f->open_object_section("pg");
f->dump_string("state", pg_state_string(get_state()));
f->dump_stream("snap_trimq") << snap_trimq;
+ f->dump_unsigned("snap_trimq_len", snap_trimq.size());
f->dump_unsigned("epoch", get_osdmap()->get_epoch());
f->open_array_section("up");
for (vector<int>::iterator p = up.begin(); p != up.end(); ++p)
if (candidate.get_namespace() == cct->_conf->osd_hit_set_namespace)
continue;
+ if (missing_loc.is_deleted(candidate))
+ continue;
+
// skip wrong namespace
if (m->get_hobj().nspace != librados::all_nspaces &&
candidate.get_namespace() != m->get_hobj().nspace)
if (candidate.get_namespace() != m->get_hobj().nspace)
continue;
+ if (missing_loc.is_deleted(candidate))
+ continue;
+
if (filter && !pgls_filter(filter, candidate, filter_out))
continue;
if (is_degraded() ||
state_test(PG_STATE_RECOVERING |
PG_STATE_RECOVERY_WAIT |
- PG_STATE_BACKFILL |
+ PG_STATE_BACKFILLING |
PG_STATE_BACKFILL_WAIT |
PG_STATE_BACKFILL_TOOFULL)) {
target = cct->_conf->osd_max_pg_log_entries;
<< ", queue on waiting_for_map " << op->get_source() << dendl;
waiting_for_map[op->get_source()].push_back(op);
op->mark_delayed("op must wait for map");
+ osd->request_osdmap_update(op->min_epoch);
return;
}
}
}
- if (flushes_in_progress > 0) {
- dout(20) << flushes_in_progress
- << " flushes_in_progress pending "
- << "waiting for active on " << op << dendl;
- waiting_for_peered.push_back(op);
- op->mark_delayed("waiting for peered");
- return;
- }
-
if (!is_peered()) {
// Delay unless PGBackend says it's ok
if (pgbackend->can_handle_while_inactive(op)) {
}
}
+ if (flushes_in_progress > 0) {
+ dout(20) << flushes_in_progress
+ << " flushes_in_progress pending "
+ << "waiting for flush on " << op << dendl;
+ waiting_for_flush.push_back(op);
+ op->mark_delayed("waiting for flush");
+ return;
+ }
+
assert(is_peered() && flushes_in_progress == 0);
if (pgbackend->handle_message(op))
return;
}
}
- if (op->includes_pg_op()) {
- return do_pg_op(op);
- }
-
if (!op_has_sufficient_caps(op)) {
osd->reply_op_error(op, -EPERM);
return;
}
+ if (op->includes_pg_op()) {
+ return do_pg_op(op);
+ }
+
// object name too long?
if (m->get_oid().name.size() > cct->_conf->osd_max_object_name_len) {
dout(4) << "do_op name is longer than "
// missing object?
if (is_unreadable_object(head)) {
+ if (!is_primary()) {
+ osd->reply_op_error(op, -EAGAIN);
+ return;
+ }
if (can_backoff &&
(g_conf->osd_backoff_on_degraded ||
(g_conf->osd_backoff_on_unfound && missing_loc.is_unfound(head)))) {
if (write_ordered && is_degraded_or_backfilling_object(head)) {
if (can_backoff && g_conf->osd_backoff_on_degraded) {
add_backoff(session, head, head);
+ maybe_kick_recovery(head);
} else {
wait_for_degraded_object(head, op);
}
return;
}
+ if (obc.get() && obc->obs.exists && obc->obs.oi.has_manifest()) {
+ if (maybe_handle_manifest(op,
+ write_ordered,
+ obc))
+ return;
+ }
+
if (maybe_handle_cache(op,
write_ordered,
obc,
fill_in_copy_get_noent(op, oid, m->ops[0]);
return;
}
- dout(20) << __func__ << "find_object_context got error " << r << dendl;
+ dout(20) << __func__ << ": find_object_context got error " << r << dendl;
if (op->may_write() &&
- get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+ get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
record_write_error(op, oid, nullptr, r);
} else {
osd->reply_op_error(op, r);
}
}
- OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops, obc, this);
+ OpContext *ctx = new OpContext(op, m->get_reqid(), &m->ops, obc, this);
if (!obc->obs.exists)
ctx->snapset_obc = get_object_context(obc->obs.oi.soid.get_snapdir(), false);
dout(20) << __func__ << " returned an error: " << r << dendl;
close_op_ctx(ctx);
if (op->may_write() &&
- get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+ get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
record_write_error(op, oid, nullptr, r);
} else {
osd->reply_op_error(op, r);
maybe_force_recovery();
}
+PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail(
+ OpRequestRef op,
+ bool write_ordered,
+ ObjectContextRef obc)
+{
+ if (static_cast<const MOSDOp *>(op->get_req())->get_flags() &
+ CEPH_OSD_FLAG_IGNORE_REDIRECT) {
+ dout(20) << __func__ << ": ignoring redirect due to flag" << dendl;
+ return cache_result_t::NOOP;
+ }
+
+ if (obc)
+ dout(10) << __func__ << " " << obc->obs.oi << " "
+ << (obc->obs.exists ? "exists" : "DNE")
+ << dendl;
+
+ // if it is write-ordered and blocked, stop now
+ if (obc.get() && obc->is_blocked() && write_ordered) {
+ // we're already doing something with this object
+ dout(20) << __func__ << " blocked on " << obc->obs.oi.soid << dendl;
+ return cache_result_t::NOOP;
+ }
+
+ vector<OSDOp> ops = static_cast<const MOSDOp*>(op->get_req())->ops;
+ for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p) {
+ OSDOp& osd_op = *p;
+ ceph_osd_op& op = osd_op.op;
+ if (op.op == CEPH_OSD_OP_SET_REDIRECT) {
+ return cache_result_t::NOOP;
+ }
+ }
+
+ switch (obc->obs.oi.manifest.type) {
+ case object_manifest_t::TYPE_REDIRECT:
+ if (op->may_write() || write_ordered) {
+ do_proxy_write(op, obc->obs.oi.soid, obc);
+ } else {
+ do_proxy_read(op, obc);
+ }
+ return cache_result_t::HANDLED_PROXY;
+ case object_manifest_t::TYPE_CHUNKED:
+ default:
+ assert(0 == "unrecognized manifest type");
+ }
+
+ return cache_result_t::NOOP;
+}
+
void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid,
MOSDOpReply *orig_reply, int r)
{
dout(20) << __func__ << " r=" << r << dendl;
assert(op->may_write());
const osd_reqid_t &reqid = static_cast<const MOSDOp*>(op->get_req())->get_reqid();
- ObjectContextRef obc;
- mempool::osd::list<pg_log_entry_t> entries;
+ mempool::osd_pglog::list<pg_log_entry_t> entries;
entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR, soid,
get_next_version(), eversion_t(), 0,
reqid, utime_t(), r));
bool in_hit_set,
ObjectContextRef *promote_obc)
{
+ // return quickly if caching is not enabled
+ if (pool.info.cache_mode == pg_pool_t::CACHEMODE_NONE)
+ return cache_result_t::NOOP;
+
if (op &&
op->get_req() &&
op->get_req()->get_type() == CEPH_MSG_OSD_OP &&
dout(20) << __func__ << ": ignoring cache due to flag" << dendl;
return cache_result_t::NOOP;
}
- // return quickly if caching is not enabled
- if (pool.info.cache_mode == pg_pool_t::CACHEMODE_NONE)
- return cache_result_t::NOOP;
must_promote = must_promote || op->need_promote();
osd->logger->inc(l_osd_op_cache_hit);
return cache_result_t::NOOP;
}
+ if (!is_primary()) {
+ dout(20) << __func__ << " cache miss; ask the primary" << dendl;
+ osd->reply_op_error(op, -EAGAIN);
+ return cache_result_t::REPLIED_WITH_EAGAIN;
+ }
if (missing_oid == hobject_t() && obc.get()) {
missing_oid = obc->obs.oi.soid;
}
};
-void PrimaryLogPG::do_proxy_read(OpRequestRef op)
+void PrimaryLogPG::do_proxy_read(OpRequestRef op, ObjectContextRef obc)
{
// NOTE: non-const here because the ProxyReadOp needs mutable refs to
// stash the result in the request's OSDOp vector
MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
- object_locator_t oloc(m->get_object_locator());
- oloc.pool = pool.info.tier_of;
-
- const hobject_t& soid = m->get_hobj();
+ object_locator_t oloc;
+ hobject_t soid;
+ /* extensible tier */
+ if (obc && obc->obs.exists && obc->obs.oi.has_manifest()) {
+ switch (obc->obs.oi.manifest.type) {
+ case object_manifest_t::TYPE_REDIRECT:
+ oloc = object_locator_t(obc->obs.oi.manifest.redirect_target);
+ soid = obc->obs.oi.manifest.redirect_target;
+ break;
+ case object_manifest_t::TYPE_CHUNKED:
+ default:
+ assert(0 == "unrecognized manifest type");
+ }
+ } else {
+ /* proxy */
+ soid = m->get_hobj();
+ oloc = object_locator_t(m->get_object_locator());
+ oloc.pool = pool.info.tier_of;
+ }
unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY;
// pass through some original flags that make sense.
case CEPH_OSD_OP_SYNC_READ:
case CEPH_OSD_OP_SPARSE_READ:
case CEPH_OSD_OP_CHECKSUM:
+ case CEPH_OSD_OP_CMPEXT:
op.flags = (op.flags | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL) &
~(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_FADVISE_NOCACHE);
}
osd->logger->inc(l_osd_tier_proxy_read);
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
- OpContext *ctx = new OpContext(op, m->get_reqid(), prdop->ops, this);
+ OpContext *ctx = new OpContext(op, m->get_reqid(), &prdop->ops, this);
ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
ctx->user_at_version = prdop->user_version;
ctx->data_off = prdop->data_offset;
}
};
-void PrimaryLogPG::do_proxy_write(OpRequestRef op, const hobject_t& missing_oid)
+void PrimaryLogPG::do_proxy_write(OpRequestRef op, const hobject_t& missing_oid, ObjectContextRef obc)
{
// NOTE: non-const because ProxyWriteOp takes a mutable ref
MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
- object_locator_t oloc(m->get_object_locator());
- oloc.pool = pool.info.tier_of;
+ object_locator_t oloc;
SnapContext snapc(m->get_snap_seq(), m->get_snaps());
+ hobject_t soid;
+ /* extensible tier */
+ if (obc && obc->obs.exists && obc->obs.oi.has_manifest()) {
+ switch (obc->obs.oi.manifest.type) {
+ case object_manifest_t::TYPE_REDIRECT:
+ oloc = object_locator_t(obc->obs.oi.manifest.redirect_target);
+ soid = obc->obs.oi.manifest.redirect_target;
+ break;
+ case object_manifest_t::TYPE_CHUNKED:
+ default:
+ assert(0 == "unrecognized manifest type");
+ }
+ } else {
+ /* proxy */
+ soid = m->get_hobj();
+ oloc = object_locator_t(m->get_object_locator());
+ oloc.pool = pool.info.tier_of;
+ }
- const hobject_t& soid = m->get_hobj();
unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY;
+ if (!(op->may_write() || op->may_cache())) {
+ flags |= CEPH_OSD_FLAG_RWORDERED;
+ }
dout(10) << __func__ << " Start proxy write for " << *m << dendl;
ProxyWriteOpRef pwop(std::make_shared<ProxyWriteOp>(op, soid, m->ops, m->get_reqid()));
- pwop->ctx = new OpContext(op, m->get_reqid(), pwop->ops, this);
+ pwop->ctx = new OpContext(op, m->get_reqid(), &pwop->ops, this);
pwop->mtime = m->get_mtime();
ObjectOperation obj_op;
ctx->at_version = get_next_version();
ctx->mtime = m->get_mtime();
- dout(10) << __func__ << " " << soid << " " << ctx->ops
+ dout(10) << __func__ << " " << soid << " " << *ctx->ops
<< " ov " << obc->obs.oi.version << " av " << ctx->at_version
<< " snapc " << ctx->snapc
<< " snapset " << obc->ssc->snapset
<< dendl;
} else {
- dout(10) << __func__ << " " << soid << " " << ctx->ops
+ dout(10) << __func__ << " " << soid << " " << *ctx->ops
<< " ov " << obc->obs.oi.version
<< dendl;
}
obc->ondisk_read_unlock();
}
- if (result == -EINPROGRESS) {
+ bool pending_async_reads = !ctx->pending_async_reads.empty();
+ if (result == -EINPROGRESS || pending_async_reads) {
// come back later.
+ if (pending_async_reads) {
+ in_progress_async_reads.push_back(make_pair(op, ctx));
+ ctx->start_async_reads(this);
+ }
return;
}
if (result >= 0)
do_osd_op_effects(ctx, m->get_connection());
- if (ctx->pending_async_reads.empty()) {
- complete_read_ctx(result, ctx);
- } else {
- in_progress_async_reads.push_back(make_pair(op, ctx));
- ctx->start_async_reads(this);
- }
-
+ complete_read_ctx(result, ctx);
return;
}
// save just what we need from ctx
MOSDOpReply *reply = ctx->reply;
ctx->reply = nullptr;
- reply->claim_op_out_data(ctx->ops);
- reply->get_header().data_off = ctx->data_off;
+ reply->claim_op_out_data(*ctx->ops);
+ reply->get_header().data_off = (ctx->data_off ? *ctx->data_off : 0);
close_op_ctx(ctx);
if (result == -ENOENT) {
repop->put();
}
+void PrimaryLogPG::close_op_ctx(OpContext *ctx) {
+ release_object_locks(ctx->lock_manager);
+
+ ctx->op_t.reset();
+
+ for (auto p = ctx->on_finish.begin(); p != ctx->on_finish.end();
+ ctx->on_finish.erase(p++)) {
+ (*p)();
+ }
+ delete ctx;
+}
+
void PrimaryLogPG::reply_ctx(OpContext *ctx, int r)
{
if (ctx->op)
assert(r == 0);
}
-PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
- bool first, const hobject_t &coid)
+int PrimaryLogPG::trim_object(
+ bool first, const hobject_t &coid, PrimaryLogPG::OpContextUPtr *ctxp)
{
+ *ctxp = NULL;
// load clone info
bufferlist bl;
ObjectContextRef obc = get_object_context(coid, false, NULL);
- if (!obc) {
- derr << __func__ << " could not find coid " << coid << dendl;
- ceph_abort();
+ if (!obc || !obc->ssc || !obc->ssc->exists) {
+ osd->clog->error() << __func__ << ": Can not trim " << coid
+ << " repair needed " << (obc ? "(no obc->ssc or !exists)" : "(no obc)");
+ return -ENOENT;
}
- assert(obc->ssc);
hobject_t snapoid(
coid.oid, coid.get_key(),
obc->ssc->snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR, coid.get_hash(),
info.pgid.pool(), coid.get_namespace());
ObjectContextRef snapset_obc = get_object_context(snapoid, false);
- assert(snapset_obc);
+ if (!snapset_obc) {
+ osd->clog->error() << __func__ << ": Can not trim " << coid
+ << " repair needed, no snapset obc for " << snapoid;
+ return -ENOENT;
+ }
SnapSet& snapset = obc->ssc->snapset;
bool legacy = snapset.is_legacy() ||
- !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+ get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
object_info_t &coi = obc->obs.oi;
set<snapid_t> old_snaps;
} else {
auto p = snapset.clone_snaps.find(coid.snap);
if (p == snapset.clone_snaps.end()) {
- osd->clog->error() << __func__ << " No clone_snaps in snapset " << snapset
- << " for " << coid << "\n";
- return NULL;
+ osd->clog->error() << "No clone_snaps in snapset " << snapset
+ << " for object " << coid << "\n";
+ return -ENOENT;
}
old_snaps.insert(snapset.clone_snaps[coid.snap].begin(),
snapset.clone_snaps[coid.snap].end());
}
if (old_snaps.empty()) {
- osd->clog->error() << __func__ << " No object info snaps for " << coid;
- return NULL;
+ osd->clog->error() << "No object info snaps for object " << coid;
+ return -ENOENT;
}
dout(10) << coid << " old_snaps " << old_snaps
<< " old snapset " << snapset << dendl;
if (snapset.seq == 0) {
- osd->clog->error() << __func__ << " No snapset.seq for " << coid;
- return NULL;
+ osd->clog->error() << "No snapset.seq for object " << coid;
+ return -ENOENT;
}
set<snapid_t> new_snaps;
if (new_snaps.empty()) {
p = std::find(snapset.clones.begin(), snapset.clones.end(), coid.snap);
if (p == snapset.clones.end()) {
- osd->clog->error() << __func__ << " Snap " << coid.snap << " not in clones";
- return NULL;
+ osd->clog->error() << "Snap " << coid.snap << " not in clones";
+ return -ENOENT;
}
}
first)) {
close_op_ctx(ctx.release());
dout(10) << __func__ << ": Unable to get a wlock on " << coid << dendl;
- return NULL;
+ return -ENOLCK;
}
if (!ctx->lock_manager.get_snaptrimmer_write(
first)) {
close_op_ctx(ctx.release());
dout(10) << __func__ << ": Unable to get a wlock on " << snapoid << dendl;
- return NULL;
+ return -ENOLCK;
}
ctx->at_version = get_next_version();
coid,
old_snaps,
new_snaps);
+
+ coi = object_info_t(coid);
+
ctx->at_version.version++;
} else {
// save adjusted snaps for this object
ctx->delta_stats.num_objects--;
if (oi.is_dirty()) {
ctx->delta_stats.num_objects_dirty--;
- oi.clear_flag(object_info_t::FLAG_DIRTY);
}
if (oi.is_omap())
ctx->delta_stats.num_objects_omap--;
if (oi.is_whiteout()) {
dout(20) << __func__ << " trimming whiteout on " << oi.soid << dendl;
ctx->delta_stats.num_whiteouts--;
- oi.clear_flag(object_info_t::FLAG_WHITEOUT);
}
- if (oi.is_cache_pinned())
+ if (oi.is_cache_pinned()) {
ctx->delta_stats.num_objects_pinned--;
+ }
}
ctx->snapset_obc->obs.exists = false;
-
+ ctx->snapset_obc->obs.oi = object_info_t(snapoid);
t->remove(snapoid);
} else {
dout(10) << coid << " filtering snapset on " << snapoid << dendl;
t->setattrs(snapoid, attrs);
}
- return ctx;
+ *ctxp = std::move(ctx);
+ return 0;
}
void PrimaryLogPG::kick_snap_trim()
}
}
-int PrimaryLogPG::do_extent_cmp(OpContext *ctx, OSDOp& osd_op)
-{
- ceph_osd_op& op = osd_op.op;
- vector<OSDOp> read_ops(1);
- OSDOp& read_op = read_ops[0];
- int result = 0;
-
- read_op.op.op = CEPH_OSD_OP_SYNC_READ;
- read_op.op.extent.offset = op.extent.offset;
- read_op.op.extent.length = op.extent.length;
- read_op.op.extent.truncate_seq = op.extent.truncate_seq;
- read_op.op.extent.truncate_size = op.extent.truncate_size;
-
- result = do_osd_ops(ctx, read_ops);
- if (result < 0) {
- derr << "do_extent_cmp do_osd_ops failed " << result << dendl;
- return result;
- }
-
- if (read_op.outdata.length() != osd_op.indata.length())
- return -EINVAL;
-
- for (uint64_t p = 0; p < osd_op.indata.length(); p++) {
- if (read_op.outdata[p] != osd_op.indata[p]) {
- return (-MAX_ERRNO - p);
- }
- }
-
- return result;
-}
-
int PrimaryLogPG::do_writesame(OpContext *ctx, OSDOp& osd_op)
{
ceph_osd_op& op = osd_op.op;
r(r), rval(rv), outdatap(blp), maybe_crc(mc),
size(size), osd(osd), soid(soid), flags(flags) {}
void finish(int len) override {
- *rval = len;
*r = len;
- if (len < 0)
+ if (len < 0) {
+ *rval = len;
return;
+ }
+ *rval = 0;
+
// whole object? can we verify the checksum?
if (maybe_crc && *r == size) {
uint32_t crc = outdatap->crc32c(-1);
};
struct ToSparseReadResult : public Context {
- bufferlist& data_bl;
+ int* result;
+ bufferlist* data_bl;
uint64_t data_offset;
- ceph_le64& len;
- ToSparseReadResult(bufferlist& bl, uint64_t offset, ceph_le64& len):
- data_bl(bl), data_offset(offset),len(len) {}
+ ceph_le64* len;
+ ToSparseReadResult(int* result, bufferlist* bl, uint64_t offset,
+ ceph_le64* len)
+ : result(result), data_bl(bl), data_offset(offset),len(len) {}
void finish(int r) override {
- if (r < 0) return;
- len = r;
+ if (r < 0) {
+ *result = r;
+ return;
+ }
+ *result = 0;
+ *len = r;
bufferlist outdata;
map<uint64_t, uint64_t> extents = {{data_offset, r}};
::encode(extents, outdata);
- ::encode_destructively(data_bl, outdata);
- data_bl.swap(outdata);
+ ::encode_destructively(*data_bl, outdata);
+ data_bl->swap(outdata);
}
};
}
}
+struct ReadFinisher : public PrimaryLogPG::OpFinisher {
+ OSDOp& osd_op;
+
+ ReadFinisher(OSDOp& osd_op) : osd_op(osd_op) {
+ }
+
+ int execute() override {
+ return osd_op.rval;
+ }
+};
+
struct C_ChecksumRead : public Context {
PrimaryLogPG *primary_log_pg;
OSDOp &osd_op;
&read_bl, maybe_crc, size,
osd, soid, flags)) {
}
+ ~C_ChecksumRead() override {
+ delete fill_extent_ctx;
+ }
void finish(int r) override {
fill_extent_ctx->complete(r);
+ fill_extent_ctx = nullptr;
if (osd_op.rval >= 0) {
bufferlist::iterator init_value_bl_it = init_value_bl.begin();
osd_op.rval = primary_log_pg->finish_checksum(osd_op, csum_type,
- &init_value_bl_it,
- read_bl);
+ &init_value_bl_it, read_bl);
}
}
};
int PrimaryLogPG::do_checksum(OpContext *ctx, OSDOp& osd_op,
- bufferlist::iterator *bl_it, bool *async_read)
+ bufferlist::iterator *bl_it)
{
dout(20) << __func__ << dendl;
auto checksum_ctx = new C_ChecksumRead(this, osd_op, csum_type,
std::move(init_value_bl), maybe_crc,
oi.size, osd, soid, op.flags);
+
ctx->pending_async_reads.push_back({
{op.checksum.offset, op.checksum.length, op.flags},
{&checksum_ctx->read_bl, checksum_ctx}});
dout(10) << __func__ << ": async_read noted for " << soid << dendl;
- *async_read = true;
- return 0;
+ ctx->op_finishers[ctx->current_osd_subop_num].reset(
+ new ReadFinisher(osd_op));
+ return -EINPROGRESS;
}
// sync read
- *async_read = false;
std::vector<OSDOp> read_ops(1);
auto& read_op = read_ops[0];
if (op.checksum.length > 0) {
return 0;
}
+struct C_ExtentCmpRead : public Context {
+ PrimaryLogPG *primary_log_pg;
+ OSDOp &osd_op;
+ ceph_le64 read_length;
+ bufferlist read_bl;
+ Context *fill_extent_ctx;
+
+ C_ExtentCmpRead(PrimaryLogPG *primary_log_pg, OSDOp &osd_op,
+ boost::optional<uint32_t> maybe_crc, uint64_t size,
+ OSDService *osd, hobject_t soid, __le32 flags)
+ : primary_log_pg(primary_log_pg), osd_op(osd_op),
+ fill_extent_ctx(new FillInVerifyExtent(&read_length, &osd_op.rval,
+ &read_bl, maybe_crc, size,
+ osd, soid, flags)) {
+ }
+ ~C_ExtentCmpRead() override {
+ delete fill_extent_ctx;
+ }
+
+ void finish(int r) override {
+ if (r == -ENOENT) {
+ osd_op.rval = 0;
+ read_bl.clear();
+ delete fill_extent_ctx;
+ } else {
+ fill_extent_ctx->complete(r);
+ }
+ fill_extent_ctx = nullptr;
+
+ if (osd_op.rval >= 0) {
+ osd_op.rval = primary_log_pg->finish_extent_cmp(osd_op, read_bl);
+ }
+ }
+};
+
+int PrimaryLogPG::do_extent_cmp(OpContext *ctx, OSDOp& osd_op)
+{
+ dout(20) << __func__ << dendl;
+ ceph_osd_op& op = osd_op.op;
+
+ auto& oi = ctx->new_obs.oi;
+ uint64_t size = oi.size;
+ if ((oi.truncate_seq < op.extent.truncate_seq) &&
+ (op.extent.offset + op.extent.length > op.extent.truncate_size)) {
+ size = op.extent.truncate_size;
+ }
+
+ if (op.extent.offset >= size) {
+ op.extent.length = 0;
+ } else if (op.extent.offset + op.extent.length > size) {
+ op.extent.length = size - op.extent.offset;
+ }
+
+ if (op.extent.length == 0) {
+ dout(20) << __func__ << " zero length extent" << dendl;
+ return finish_extent_cmp(osd_op, bufferlist{});
+ } else if (!ctx->obs->exists || ctx->obs->oi.is_whiteout()) {
+ dout(20) << __func__ << " object DNE" << dendl;
+ return finish_extent_cmp(osd_op, {});
+ } else if (pool.info.require_rollback()) {
+ // If there is a data digest and it is possible we are reading
+ // entire object, pass the digest.
+ boost::optional<uint32_t> maybe_crc;
+ if (oi.is_data_digest() && op.checksum.offset == 0 &&
+ op.checksum.length >= oi.size) {
+ maybe_crc = oi.data_digest;
+ }
+
+ // async read
+ auto& soid = oi.soid;
+ auto extent_cmp_ctx = new C_ExtentCmpRead(this, osd_op, maybe_crc, oi.size,
+ osd, soid, op.flags);
+ ctx->pending_async_reads.push_back({
+ {op.extent.offset, op.extent.length, op.flags},
+ {&extent_cmp_ctx->read_bl, extent_cmp_ctx}});
+
+ dout(10) << __func__ << ": async_read noted for " << soid << dendl;
+
+ ctx->op_finishers[ctx->current_osd_subop_num].reset(
+ new ReadFinisher(osd_op));
+ return -EINPROGRESS;
+ }
+
+ // sync read
+ vector<OSDOp> read_ops(1);
+ OSDOp& read_op = read_ops[0];
+
+ read_op.op.op = CEPH_OSD_OP_SYNC_READ;
+ read_op.op.extent.offset = op.extent.offset;
+ read_op.op.extent.length = op.extent.length;
+ read_op.op.extent.truncate_seq = op.extent.truncate_seq;
+ read_op.op.extent.truncate_size = op.extent.truncate_size;
+
+ int result = do_osd_ops(ctx, read_ops);
+ if (result < 0) {
+ derr << __func__ << " failed " << result << dendl;
+ return result;
+ }
+ return finish_extent_cmp(osd_op, read_op.outdata);
+}
+
+int PrimaryLogPG::finish_extent_cmp(OSDOp& osd_op, const bufferlist &read_bl)
+{
+ for (uint64_t idx = 0; idx < osd_op.indata.length(); ++idx) {
+ char read_byte = (idx < read_bl.length() ? read_bl[idx] : 0);
+ if (osd_op.indata[idx] != read_byte) {
+ return (-MAX_ERRNO - idx);
+ }
+ }
+
+ return 0;
+}
+
+int PrimaryLogPG::do_read(OpContext *ctx, OSDOp& osd_op) {
+ dout(20) << __func__ << dendl;
+ auto& op = osd_op.op;
+ auto& oi = ctx->new_obs.oi;
+ auto& soid = oi.soid;
+ __u32 seq = oi.truncate_seq;
+ uint64_t size = oi.size;
+ bool trimmed_read = false;
+
+ // are we beyond truncate_size?
+ if ( (seq < op.extent.truncate_seq) &&
+ (op.extent.offset + op.extent.length > op.extent.truncate_size) )
+ size = op.extent.truncate_size;
+
+ if (op.extent.length == 0) //length is zero mean read the whole object
+ op.extent.length = size;
+
+ if (op.extent.offset >= size) {
+ op.extent.length = 0;
+ trimmed_read = true;
+ } else if (op.extent.offset + op.extent.length > size) {
+ op.extent.length = size - op.extent.offset;
+ trimmed_read = true;
+ }
+
+ // read into a buffer
+ int result = 0;
+ if (trimmed_read && op.extent.length == 0) {
+ // read size was trimmed to zero and it is expected to do nothing
+ // a read operation of 0 bytes does *not* do nothing, this is why
+ // the trimmed_read boolean is needed
+ } else if (pool.info.require_rollback()) {
+ boost::optional<uint32_t> maybe_crc;
+ // If there is a data digest and it is possible we are reading
+ // entire object, pass the digest. FillInVerifyExtent will
+ // will check the oi.size again.
+ if (oi.is_data_digest() && op.extent.offset == 0 &&
+ op.extent.length >= oi.size)
+ maybe_crc = oi.data_digest;
+ ctx->pending_async_reads.push_back(
+ make_pair(
+ boost::make_tuple(op.extent.offset, op.extent.length, op.flags),
+ make_pair(&osd_op.outdata,
+ new FillInVerifyExtent(&op.extent.length, &osd_op.rval,
+ &osd_op.outdata, maybe_crc, oi.size,
+ osd, soid, op.flags))));
+ dout(10) << " async_read noted for " << soid << dendl;
+
+ ctx->op_finishers[ctx->current_osd_subop_num].reset(
+ new ReadFinisher(osd_op));
+ } else {
+ int r = pgbackend->objects_read_sync(
+ soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata);
+ if (r == -EIO) {
+ r = rep_repair_primary_object(soid, ctx->op);
+ }
+ if (r >= 0)
+ op.extent.length = r;
+ else {
+ result = r;
+ op.extent.length = 0;
+ }
+ dout(10) << " read got " << r << " / " << op.extent.length
+ << " bytes from obj " << soid << dendl;
+
+ // whole object? can we verify the checksum?
+ if (op.extent.length == oi.size && oi.is_data_digest()) {
+ uint32_t crc = osd_op.outdata.crc32c(-1);
+ if (oi.data_digest != crc) {
+ osd->clog->error() << info.pgid << std::hex
+ << " full-object read crc 0x" << crc
+ << " != expected 0x" << oi.data_digest
+ << std::dec << " on " << soid;
+ // FIXME fall back to replica or something?
+ result = -EIO;
+ }
+ }
+ }
+
+ // XXX the op.extent.length is the requested length for async read
+ // On error this length is changed to 0 after the error comes back.
+ ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
+ ctx->delta_stats.num_rd++;
+ return result;
+}
+
+int PrimaryLogPG::do_sparse_read(OpContext *ctx, OSDOp& osd_op) {
+ dout(20) << __func__ << dendl;
+ auto& op = osd_op.op;
+ auto& oi = ctx->new_obs.oi;
+ auto& soid = oi.soid;
+
+ if (op.extent.truncate_seq) {
+ dout(0) << "sparse_read does not support truncation sequence " << dendl;
+ return -EINVAL;
+ }
+
+ ++ctx->num_read;
+ if (pool.info.ec_pool()) {
+ // translate sparse read to a normal one if not supported
+ uint64_t offset = op.extent.offset;
+ uint64_t length = op.extent.length;
+ if (offset > oi.size) {
+ length = 0;
+ } else if (offset + length > oi.size) {
+ length = oi.size - offset;
+ }
+
+ if (length > 0) {
+ ctx->pending_async_reads.push_back(
+ make_pair(
+ boost::make_tuple(offset, length, op.flags),
+ make_pair(
+ &osd_op.outdata,
+ new ToSparseReadResult(&osd_op.rval, &osd_op.outdata, offset,
+ &op.extent.length))));
+ dout(10) << " async_read (was sparse_read) noted for " << soid << dendl;
+
+ ctx->op_finishers[ctx->current_osd_subop_num].reset(
+ new ReadFinisher(osd_op));
+ } else {
+ dout(10) << " sparse read ended up empty for " << soid << dendl;
+ map<uint64_t, uint64_t> extents;
+ ::encode(extents, osd_op.outdata);
+ }
+ } else {
+ // read into a buffer
+ map<uint64_t, uint64_t> m;
+ uint32_t total_read = 0;
+ int r = osd->store->fiemap(ch, ghobject_t(soid, ghobject_t::NO_GEN,
+ info.pgid.shard),
+ op.extent.offset, op.extent.length, m);
+ if (r < 0) {
+ return r;
+ }
+
+ map<uint64_t, uint64_t>::iterator miter;
+ bufferlist data_bl;
+ uint64_t last = op.extent.offset;
+ for (miter = m.begin(); miter != m.end(); ++miter) {
+ // verify hole?
+ if (cct->_conf->osd_verify_sparse_read_holes &&
+ last < miter->first) {
+ bufferlist t;
+ uint64_t len = miter->first - last;
+ r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
+ if (r < 0) {
+ osd->clog->error() << coll << " " << soid
+ << " sparse-read failed to read: "
+ << r;
+ } else if (!t.is_zero()) {
+ osd->clog->error() << coll << " " << soid
+ << " sparse-read found data in hole "
+ << last << "~" << len;
+ }
+ }
+
+ bufferlist tmpbl;
+ r = pgbackend->objects_read_sync(soid, miter->first, miter->second,
+ op.flags, &tmpbl);
+ if (r == -EIO) {
+ r = rep_repair_primary_object(soid, ctx->op);
+ }
+ if (r < 0) {
+ return r;
+ }
+
+ // this is usually happen when we get extent that exceeds the actual file
+ // size
+ if (r < (int)miter->second)
+ miter->second = r;
+ total_read += r;
+ dout(10) << "sparse-read " << miter->first << "@" << miter->second
+ << dendl;
+ data_bl.claim_append(tmpbl);
+ last = miter->first + r;
+ }
+
+ if (r < 0) {
+ return r;
+ }
+
+ // verify trailing hole?
+ if (cct->_conf->osd_verify_sparse_read_holes) {
+ uint64_t end = MIN(op.extent.offset + op.extent.length, oi.size);
+ if (last < end) {
+ bufferlist t;
+ uint64_t len = end - last;
+ r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
+ if (r < 0) {
+ osd->clog->error() << coll << " " << soid
+ << " sparse-read failed to read: " << r;
+ } else if (!t.is_zero()) {
+ osd->clog->error() << coll << " " << soid
+ << " sparse-read found data in hole "
+ << last << "~" << len;
+ }
+ }
+ }
+
+ // Why SPARSE_READ need checksum? In fact, librbd always use sparse-read.
+ // Maybe at first, there is no much whole objects. With continued use, more
+ // and more whole object exist. So from this point, for spare-read add
+ // checksum make sense.
+ if (total_read == oi.size && oi.is_data_digest()) {
+ uint32_t crc = data_bl.crc32c(-1);
+ if (oi.data_digest != crc) {
+ osd->clog->error() << info.pgid << std::hex
+ << " full-object read crc 0x" << crc
+ << " != expected 0x" << oi.data_digest
+ << std::dec << " on " << soid;
+ // FIXME fall back to replica or something?
+ return -EIO;
+ }
+ }
+
+ op.extent.length = total_read;
+
+ ::encode(m, osd_op.outdata); // re-encode since it might be modified
+ ::encode_destructively(data_bl, osd_op.outdata);
+
+ dout(10) << " sparse_read got " << total_read << " bytes from object "
+ << soid << dendl;
+ }
+
+ ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
+ ctx->delta_stats.num_rd++;
+ return 0;
+}
+
int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
{
int result = 0;
object_info_t& oi = obs.oi;
const hobject_t& soid = oi.soid;
- bool first_read = true;
-
PGTransaction* t = ctx->op_t.get();
dout(10) << "do_osd_op " << soid << " " << ops << dendl;
- for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p, ctx->current_osd_subop_num++) {
+ ctx->current_osd_subop_num = 0;
+ for (auto p = ops.begin(); p != ops.end(); ++p, ctx->current_osd_subop_num++, ctx->processed_subop_count++) {
OSDOp& osd_op = *p;
ceph_osd_op& op = osd_op.op;
+ OpFinisher* op_finisher = nullptr;
+ {
+ auto op_finisher_it = ctx->op_finishers.find(ctx->current_osd_subop_num);
+ if (op_finisher_it != ctx->op_finishers.end()) {
+ op_finisher = op_finisher_it->second.get();
+ }
+ }
+
// TODO: check endianness (__le32 vs uint32_t, etc.)
// The fields in ceph_osd_op are little-endian (according to the definition in rados.h),
// but the code in this function seems to treat them as native-endian. What should the
case CEPH_OSD_OP_COPY_FROM: // we handle user_version update explicitly
case CEPH_OSD_OP_CACHE_PIN:
case CEPH_OSD_OP_CACHE_UNPIN:
+ case CEPH_OSD_OP_SET_REDIRECT:
break;
default:
if (op.op & CEPH_OSD_OP_MODE_WR)
case CEPH_OSD_OP_CMPEXT:
++ctx->num_read;
- tracepoint(osd, do_osd_op_pre_extent_cmp, soid.oid.name.c_str(), soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
- result = do_extent_cmp(ctx, osd_op);
+ tracepoint(osd, do_osd_op_pre_extent_cmp, soid.oid.name.c_str(),
+ soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset,
+ op.extent.length, op.extent.truncate_size,
+ op.extent.truncate_seq);
+
+ if (op_finisher == nullptr) {
+ result = do_extent_cmp(ctx, osd_op);
+ } else {
+ result = op_finisher->execute();
+ }
break;
case CEPH_OSD_OP_SYNC_READ:
result = -EOPNOTSUPP;
break;
}
- // fall through
- case CEPH_OSD_OP_READ:
- ++ctx->num_read;
- {
- __u32 seq = oi.truncate_seq;
- uint64_t size = oi.size;
- tracepoint(osd, do_osd_op_pre_read, soid.oid.name.c_str(), soid.snap.val, size, seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
- bool trimmed_read = false;
- // are we beyond truncate_size?
- if ( (seq < op.extent.truncate_seq) &&
- (op.extent.offset + op.extent.length > op.extent.truncate_size) )
- size = op.extent.truncate_size;
-
- if (op.extent.length == 0) //length is zero mean read the whole object
- op.extent.length = size;
-
- if (op.extent.offset >= size) {
- op.extent.length = 0;
- trimmed_read = true;
- } else if (op.extent.offset + op.extent.length > size) {
- op.extent.length = size - op.extent.offset;
- trimmed_read = true;
- }
-
- // read into a buffer
- bool async = false;
- if (trimmed_read && op.extent.length == 0) {
- // read size was trimmed to zero and it is expected to do nothing
- // a read operation of 0 bytes does *not* do nothing, this is why
- // the trimmed_read boolean is needed
- } else if (pool.info.require_rollback()) {
- async = true;
- boost::optional<uint32_t> maybe_crc;
- // If there is a data digest and it is possible we are reading
- // entire object, pass the digest. FillInVerifyExtent will
- // will check the oi.size again.
- if (oi.is_data_digest() && op.extent.offset == 0 &&
- op.extent.length >= oi.size)
- maybe_crc = oi.data_digest;
- ctx->pending_async_reads.push_back(
- make_pair(
- boost::make_tuple(op.extent.offset, op.extent.length, op.flags),
- make_pair(&osd_op.outdata,
- new FillInVerifyExtent(&op.extent.length, &osd_op.rval,
- &osd_op.outdata, maybe_crc, oi.size, osd,
- soid, op.flags))));
- dout(10) << " async_read noted for " << soid << dendl;
- } else {
- int r = pgbackend->objects_read_sync(
- soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata);
- if (r >= 0)
- op.extent.length = r;
- else {
- result = r;
- op.extent.length = 0;
- }
- dout(10) << " read got " << r << " / " << op.extent.length
- << " bytes from obj " << soid << dendl;
-
- // whole object? can we verify the checksum?
- if (op.extent.length == oi.size && oi.is_data_digest()) {
- uint32_t crc = osd_op.outdata.crc32c(-1);
- if (oi.data_digest != crc) {
- osd->clog->error() << info.pgid << std::hex
- << " full-object read crc 0x" << crc
- << " != expected 0x" << oi.data_digest
- << std::dec << " on " << soid;
- // FIXME fall back to replica or something?
- result = -EIO;
- }
- }
- }
- if (first_read) {
- first_read = false;
+ // fall through
+ case CEPH_OSD_OP_READ:
+ ++ctx->num_read;
+ tracepoint(osd, do_osd_op_pre_read, soid.oid.name.c_str(),
+ soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset,
+ op.extent.length, op.extent.truncate_size,
+ op.extent.truncate_seq);
+ if (op_finisher == nullptr) {
+ if (!ctx->data_off) {
ctx->data_off = op.extent.offset;
}
- // XXX the op.extent.length is the requested length for async read
- // On error this length is changed to 0 after the error comes back.
- ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
- ctx->delta_stats.num_rd++;
-
- // Skip checking the result and just proceed to the next operation
- if (async)
- continue;
-
+ result = do_read(ctx, osd_op);
+ } else {
+ result = op_finisher->execute();
}
break;
op.checksum.offset, op.checksum.length,
op.checksum.chunk_size);
- bool async_read;
- result = do_checksum(ctx, osd_op, &bp, &async_read);
- if (result == 0 && async_read) {
- continue;
+ if (op_finisher == nullptr) {
+ result = do_checksum(ctx, osd_op, &bp);
+ } else {
+ result = op_finisher->execute();
}
}
break;
/* map extents */
case CEPH_OSD_OP_SPARSE_READ:
- tracepoint(osd, do_osd_op_pre_sparse_read, soid.oid.name.c_str(), soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
- if (op.extent.truncate_seq) {
- dout(0) << "sparse_read does not support truncation sequence " << dendl;
- result = -EINVAL;
- break;
- }
- ++ctx->num_read;
- if (pool.info.ec_pool()) {
- // translate sparse read to a normal one if not supported
- uint64_t offset = op.extent.offset;
- uint64_t length = op.extent.length;
- if (offset > oi.size) {
- length = 0;
- } else if (offset + length > oi.size) {
- length = oi.size - offset;
- }
- if (length > 0) {
- ctx->pending_async_reads.push_back(
- make_pair(
- boost::make_tuple(offset, length, op.flags),
- make_pair(
- &osd_op.outdata,
- new ToSparseReadResult(
- osd_op.outdata, offset,
- op.extent.length /* updated by the callback */))));
- dout(10) << " async_read (was sparse_read) noted for " << soid << dendl;
- } else {
- dout(10) << " sparse read ended up empty for " << soid << dendl;
- map<uint64_t, uint64_t> extents;
- ::encode(extents, osd_op.outdata);
- }
+ tracepoint(osd, do_osd_op_pre_sparse_read, soid.oid.name.c_str(),
+ soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset,
+ op.extent.length, op.extent.truncate_size,
+ op.extent.truncate_seq);
+ if (op_finisher == nullptr) {
+ result = do_sparse_read(ctx, osd_op);
} else {
- // read into a buffer
- map<uint64_t, uint64_t> m;
- uint32_t total_read = 0;
- int r = osd->store->fiemap(ch, ghobject_t(soid, ghobject_t::NO_GEN,
- info.pgid.shard),
- op.extent.offset, op.extent.length, m);
- if (r < 0) {
- result = r;
- break;
- }
- map<uint64_t, uint64_t>::iterator miter;
- bufferlist data_bl;
- uint64_t last = op.extent.offset;
- for (miter = m.begin(); miter != m.end(); ++miter) {
- // verify hole?
- if (cct->_conf->osd_verify_sparse_read_holes &&
- last < miter->first) {
- bufferlist t;
- uint64_t len = miter->first - last;
- r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
- if (r < 0) {
- osd->clog->error() << coll << " " << soid
- << " sparse-read failed to read: "
- << r;
- } else if (!t.is_zero()) {
- osd->clog->error() << coll << " " << soid << " sparse-read found data in hole "
- << last << "~" << len;
- }
- }
-
- bufferlist tmpbl;
- r = pgbackend->objects_read_sync(soid, miter->first, miter->second, op.flags, &tmpbl);
- if (r < 0) {
- result = r;
- break;
- }
-
- if (r < (int)miter->second) /* this is usually happen when we get extent that exceeds the actual file size */
- miter->second = r;
- total_read += r;
- dout(10) << "sparse-read " << miter->first << "@" << miter->second << dendl;
- data_bl.claim_append(tmpbl);
- last = miter->first + r;
- }
-
- if (r < 0) {
- result = r;
- break;
- }
-
- // verify trailing hole?
- if (cct->_conf->osd_verify_sparse_read_holes) {
- uint64_t end = MIN(op.extent.offset + op.extent.length, oi.size);
- if (last < end) {
- bufferlist t;
- uint64_t len = end - last;
- r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
- if (r < 0) {
- osd->clog->error() << coll << " " << soid
- << " sparse-read failed to read: "
- << r;
- } else if (!t.is_zero()) {
- osd->clog->error() << coll << " " << soid << " sparse-read found data in hole "
- << last << "~" << len;
- }
- }
- }
-
- // Why SPARSE_READ need checksum? In fact, librbd always use sparse-read.
- // Maybe at first, there is no much whole objects. With continued use, more and more whole object exist.
- // So from this point, for spare-read add checksum make sense.
- if (total_read == oi.size && oi.is_data_digest()) {
- uint32_t crc = data_bl.crc32c(-1);
- if (oi.data_digest != crc) {
- osd->clog->error() << info.pgid << std::hex
- << " full-object read crc 0x" << crc
- << " != expected 0x" << oi.data_digest
- << std::dec << " on " << soid;
- // FIXME fall back to replica or something?
- result = -EIO;
- break;
- }
- }
-
- op.extent.length = total_read;
-
- ::encode(m, osd_op.outdata); // re-encode since it might be modified
- ::encode_destructively(data_bl, osd_op.outdata);
-
- dout(10) << " sparse_read got " << total_read << " bytes from object " << soid << dendl;
+ result = op_finisher->execute();
}
- ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
- ctx->delta_stats.num_rd++;
break;
case CEPH_OSD_OP_CALL:
map<string, bufferlist> out;
result = getattrs_maybe_cache(
ctx->obc,
- &out,
- true);
+ &out);
bufferlist bl;
::encode(out, bl);
}
break;
+ case CEPH_OSD_OP_SET_REDIRECT:
+ ++ctx->num_write;
+ {
+ if (pool.info.is_tier()) {
+ result = -EINVAL;
+ break;
+ }
+ if (!obs.exists) {
+ result = -ENOENT;
+ break;
+ }
+ if (get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+ result = -EOPNOTSUPP;
+ break;
+ }
+
+ object_t target_name;
+ object_locator_t target_oloc;
+ snapid_t target_snapid = (uint64_t)op.copy_from.snapid;
+ version_t target_version = op.copy_from.src_version;
+ try {
+ ::decode(target_name, bp);
+ ::decode(target_oloc, bp);
+ }
+ catch (buffer::error& e) {
+ result = -EINVAL;
+ goto fail;
+ }
+ pg_t raw_pg;
+ get_osdmap()->object_locator_to_pg(target_name, target_oloc, raw_pg);
+ hobject_t target(target_name, target_oloc.key, target_snapid,
+ raw_pg.ps(), raw_pg.pool(),
+ target_oloc.nspace);
+ if (target == soid) {
+ dout(20) << " set-redirect self is invalid" << dendl;
+ result = -EINVAL;
+ break;
+ }
+ oi.set_flag(object_info_t::FLAG_MANIFEST);
+ oi.manifest.redirect_target = target;
+ oi.manifest.type = object_manifest_t::TYPE_REDIRECT;
+ t->truncate(soid, 0);
+ if (oi.is_omap() && pool.info.supports_omap()) {
+ t->omap_clear(soid);
+ obs.oi.clear_omap_digest();
+ obs.oi.clear_flag(object_info_t::FLAG_OMAP);
+ }
+ ctx->delta_stats.num_bytes -= oi.size;
+ oi.size = 0;
+ oi.new_object();
+ oi.user_version = target_version;
+ ctx->user_at_version = target_version;
+ /* rm_attrs */
+ map<string,bufferlist> rmattrs;
+ result = getattrs_maybe_cache(ctx->obc,
+ &rmattrs);
+ if (result < 0) {
+ return result;
+ }
+ map<string, bufferlist>::iterator iter;
+ for (iter = rmattrs.begin(); iter != rmattrs.end(); ++iter) {
+ const string& name = iter->first;
+ t->rmattr(soid, name);
+ }
+ dout(10) << "set-redirect oid:" << oi.soid << " user_version: " << oi.user_version << dendl;
+ }
+
+ break;
// -- object attrs --
case CEPH_OSD_OP_COPY_GET:
++ctx->num_read;
- tracepoint(osd, do_osd_op_pre_copy_get, soid.oid.name.c_str(), soid.snap.val);
- result = fill_in_copy_get(ctx, bp, osd_op, ctx->obc);
+ tracepoint(osd, do_osd_op_pre_copy_get, soid.oid.name.c_str(),
+ soid.snap.val);
+ if (op_finisher == nullptr) {
+ result = do_copy_get(ctx, bp, osd_op, ctx->obc);
+ } else {
+ result = op_finisher->execute();
+ }
break;
case CEPH_OSD_OP_COPY_FROM:
src_oloc.hash,
src_snapid,
src_version);
- if (!ctx->copy_cb) {
+ if (op_finisher == nullptr) {
// start
pg_t raw_pg;
get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg);
result = -EINVAL;
break;
}
- CopyFromCallback *cb = new CopyFromCallback(ctx);
- ctx->copy_cb = cb;
+ CopyFromCallback *cb = new CopyFromCallback(ctx, osd_op);
+ ctx->op_finishers[ctx->current_osd_subop_num].reset(
+ new CopyFromFinisher(cb));
start_copy(cb, ctx->obc, src, src_oloc, src_version,
op.copy_from.flags,
false,
result = -EINPROGRESS;
} else {
// finish
- assert(ctx->copy_cb->get_result() >= 0);
- finish_copyfrom(ctx);
- result = 0;
+ result = op_finisher->execute();
+ assert(result == 0);
+
+ // COPY_FROM cannot be executed multiple times -- it must restart
+ ctx->op_finishers.erase(ctx->current_osd_subop_num);
}
}
break;
whiteout = true;
}
bool legacy;
- if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+ if (get_osdmap()->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
legacy = false;
// in luminous or later, we can't delete the head if there are
// clones. we trust the caller passing no_whiteout has already
}
}
} else {
- legacy = false;
+ legacy = true;
}
dout(20) << __func__ << " " << soid << " whiteout=" << (int)whiteout
<< " no_whiteout=" << (int)no_whiteout
&rollback_to, false, false, &missing_oid);
if (ret == -EAGAIN) {
/* clone must be missing */
- assert(is_missing_object(missing_oid));
- dout(20) << "_rollback_to attempted to roll back to a missing object "
+ assert(is_degraded_or_backfilling_object(missing_oid));
+ dout(20) << "_rollback_to attempted to roll back to a missing or backfilling clone "
<< missing_oid << " (requested snapid: ) " << snapid << dendl;
block_write_on_degraded_snap(missing_oid, ctx->op);
return ret;
}
{
ObjectContextRef promote_obc;
- switch (
- maybe_handle_cache_detail(
- ctx->op,
- true,
- rollback_to,
- ret,
- missing_oid,
- true,
- false,
- &promote_obc)) {
+ cache_result_t tier_mode_result;
+ if (obs.exists && obs.oi.has_manifest()) {
+ tier_mode_result =
+ maybe_handle_manifest_detail(
+ ctx->op,
+ true,
+ rollback_to);
+ } else {
+ tier_mode_result =
+ maybe_handle_cache_detail(
+ ctx->op,
+ true,
+ rollback_to,
+ ret,
+ missing_oid,
+ true,
+ false,
+ &promote_obc);
+ }
+ switch (tier_mode_result) {
case cache_result_t::NOOP:
break;
case cache_result_t::BLOCKED_PROMOTE:
case cache_result_t::BLOCKED_FULL:
block_write_on_full_cache(soid, ctx->op);
return -EAGAIN;
+ case cache_result_t::REPLIED_WITH_EAGAIN:
+ assert(0 == "this can't happen, no rollback on replica");
default:
assert(0 == "must promote was set, other values are not valid");
return -EAGAIN;
snap_oi->copy_user_bits(ctx->obs->oi);
bool legacy = ctx->new_snapset.is_legacy() ||
- !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+ get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
if (legacy) {
snap_oi->legacy_snaps = snaps;
}
// update snapset with latest snap context
ctx->new_snapset.seq = snapc.seq;
ctx->new_snapset.snaps = snapc.snaps;
- if (!get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+ if (get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS) {
// pessimistic assumption that this is a net-new legacy SnapSet
ctx->delta_stats.num_legacy_snapsets++;
ctx->new_snapset.head_exists = ctx->new_obs.exists;
int PrimaryLogPG::prepare_transaction(OpContext *ctx)
{
- assert(!ctx->ops.empty());
-
+ assert(!ctx->ops->empty());
+
const hobject_t& soid = ctx->obs->oi.soid;
// valid snap context?
}
// prepare the actual mutation
- int result = do_osd_ops(ctx, ctx->ops);
+ int result = do_osd_ops(ctx, *ctx->ops);
if (result < 0) {
if (ctx->op->may_write() &&
- get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+ get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
// need to save the error code in the pg log, to detect dup ops,
// but do nothing else
ctx->update_log_only = true;
if (ctx->op_t->empty() && !ctx->modify) {
unstable_stats.add(ctx->delta_stats);
if (ctx->op->may_write() &&
- get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+ get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
ctx->update_log_only = true;
}
return result;
info.pgid.pool(), soid.get_namespace());
dout(10) << " final snapset " << ctx->new_snapset
<< " in " << snapoid << dendl;
- assert(!get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS));
+ assert(get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS);
ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::MODIFY, snapoid,
ctx->at_version,
eversion_t(),
}
bool legacy_snapset = ctx->new_snapset.is_legacy() ||
- !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+ get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
// append to log
ctx->log.push_back(pg_log_entry_t(log_op_type, soid, ctx->at_version,
const MOSDOp *m = static_cast<const MOSDOp*>(ctx->op->get_req());
assert(ctx->async_reads_complete());
- for (vector<OSDOp>::iterator p = ctx->ops.begin();
- p != ctx->ops.end() && result >= 0; ++p) {
+ for (vector<OSDOp>::iterator p = ctx->ops->begin();
+ p != ctx->ops->end() && result >= 0; ++p) {
if (p->rval < 0 && !(p->op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
result = p->rval;
break;
}
ctx->bytes_read += p->outdata.length();
}
- ctx->reply->claim_op_out_data(ctx->ops);
- ctx->reply->get_header().data_off = ctx->data_off;
+ ctx->reply->claim_op_out_data(*ctx->ops);
+ ctx->reply->get_header().data_off = (ctx->data_off ? *ctx->data_off : 0);
MOSDOpReply *reply = ctx->reply;
ctx->reply = nullptr;
C_CopyFrom_AsyncReadCb(OSDOp *osd_op, uint64_t features) :
osd_op(osd_op), features(features), len(0) {}
void finish(int r) override {
+ osd_op->rval = r;
+ if (r < 0) {
+ return;
+ }
+
assert(len > 0);
assert(len <= reply_obj.data.length());
bufferlist bl;
}
};
-int PrimaryLogPG::fill_in_copy_get(
- OpContext *ctx,
- bufferlist::iterator& bp,
- OSDOp& osd_op,
- ObjectContextRef &obc)
+int PrimaryLogPG::do_copy_get(OpContext *ctx, bufferlist::iterator& bp,
+ OSDOp& osd_op, ObjectContextRef &obc)
{
object_info_t& oi = obc->obs.oi;
hobject_t& soid = oi.soid;
if (!cursor.attr_complete) {
result = getattrs_maybe_cache(
ctx->obc,
- &out_attrs,
- true);
+ &out_attrs);
if (result < 0) {
if (cb) {
delete cb;
make_pair(
boost::make_tuple(cursor.data_offset, max_read, osd_op.op.flags),
make_pair(&bl, cb)));
- result = max_read;
- cb->len = result;
+ cb->len = max_read;
+
+ ctx->op_finishers[ctx->current_osd_subop_num].reset(
+ new ReadFinisher(osd_op));
+ result = -EINPROGRESS;
+
+ dout(10) << __func__ << ": async_read noted for " << soid << dendl;
} else {
result = pgbackend->objects_read_sync(
- oi.soid, cursor.data_offset, left, osd_op.op.flags, &bl);
+ oi.soid, cursor.data_offset, max_read, osd_op.op.flags, &bl);
if (result < 0)
return result;
}
- assert(result <= left);
- left -= result;
- cursor.data_offset += result;
+ left -= max_read;
+ cursor.data_offset += max_read;
}
if (cursor.data_offset == oi.size) {
cursor.data_complete = true;
if (cb && !async_read_started) {
delete cb;
}
- result = 0;
+
+ if (result > 0) {
+ result = 0;
+ }
return result;
}
cop->temp_cursor = cop->cursor;
}
-void PrimaryLogPG::finish_copyfrom(OpContext *ctx)
+void PrimaryLogPG::finish_copyfrom(CopyFromCallback *cb)
{
+ OpContext *ctx = cb->ctx;
dout(20) << "finish_copyfrom on " << ctx->obs->oi.soid << dendl;
- ObjectState& obs = ctx->new_obs;
- CopyFromCallback *cb = static_cast<CopyFromCallback*>(ctx->copy_cb);
+ ObjectState& obs = ctx->new_obs;
if (obs.exists) {
dout(20) << __func__ << ": exists, removing" << dendl;
ctx->op_t->remove(obs.oi.soid);
tctx->extra_reqids = results->reqids;
bool legacy_snapset = tctx->new_snapset.is_legacy() ||
- !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+ get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
if (whiteout) {
// create a whiteout
assert(tctx->new_obs.oi.soid.snap == CEPH_NOSNAP);
tctx->new_snapset.from_snap_set(
results->snapset,
- !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS));
+ get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS);
}
tctx->new_snapset.head_exists = true;
dout(20) << __func__ << " new_snapset " << tctx->new_snapset << dendl;
last_update_applied = applied_version;
if (is_primary()) {
if (scrubber.active) {
- if (last_update_applied == scrubber.subset_last_update) {
- requeue_scrub();
+ if (last_update_applied >= scrubber.subset_last_update) {
+ if (ops_blocked_by_scrub()) {
+ requeue_scrub(true);
+ } else {
+ requeue_scrub(false);
+ }
+
}
} else {
assert(scrubber.start == scrubber.end);
}
} else {
if (scrubber.active_rep_scrub) {
- if (last_update_applied == static_cast<const MOSDRepScrub*>(
+ if (last_update_applied >= static_cast<const MOSDRepScrub*>(
scrubber.active_rep_scrub->get_req())->scrub_to) {
osd->enqueue_back(
info.pgid,
PrimaryLogPG::OpContextUPtr PrimaryLogPG::simple_opc_create(ObjectContextRef obc)
{
dout(20) << __func__ << " " << obc->obs.oi.soid << dendl;
- vector<OSDOp> ops;
ceph_tid_t rep_tid = osd->get_tid();
osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
- OpContextUPtr ctx(new OpContext(OpRequestRef(), reqid, ops, obc, this));
+ OpContextUPtr ctx(new OpContext(OpRequestRef(), reqid, nullptr, obc, this));
ctx->op_t.reset(new PGTransaction());
ctx->mtime = ceph_clock_now();
return ctx;
dout(20) << __func__ << " " << repop << dendl;
issue_repop(repop, ctx.get());
eval_repop(repop);
+ calc_trim_to();
repop->put();
}
void PrimaryLogPG::submit_log_entries(
- const mempool::osd::list<pg_log_entry_t> &entries,
+ const mempool::osd_pglog::list<pg_log_entry_t> &entries,
ObcLockManager &&manager,
boost::optional<std::function<void(void)> > &&_on_complete,
OpRequestRef op,
boost::intrusive_ptr<RepGather> repop;
boost::optional<std::function<void(void)> > on_complete;
- if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+ if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
repop = new_repop(
version,
r,
if (peer == pg_whoami) continue;
assert(peer_missing.count(peer));
assert(peer_info.count(peer));
- if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+ if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
assert(repop);
MOSDPGUpdateLogMissing *m = new MOSDPGUpdateLogMissing(
entries,
peer.osd, m, get_osdmap()->get_epoch());
}
}
- if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+ if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
ceph_tid_t rep_tid = repop->rep_tid;
waiting_on.insert(pg_whoami);
log_entry_update_waiting_on.insert(
object_info_t oi(soid);
SnapSetContext *ssc = get_snapset_context(
soid, true, 0, false);
+ assert(ssc);
obc = create_object_context(oi, ssc);
dout(10) << __func__ << ": " << obc << " " << soid
<< " " << obc->rwstate
dout(10) << __func__ << ": creating obc from disk: " << obc
<< dendl;
}
- assert(obc->ssc);
+
+ // XXX: Caller doesn't expect this
+ if (obc->ssc == NULL) {
+ derr << __func__ << ": obc->ssc not available, not returning context" << dendl;
+ return ObjectContextRef(); // -ENOENT!
+ }
+
dout(10) << __func__ << ": " << obc << " " << soid
<< " " << obc->rwstate
<< " oi: " << obc->obs.oi
ObjectContextRef obc = get_object_context(soid, false);
if (!obc || !obc->obs.exists) {
- dout(20) << __func__ << " missing clone " << soid << dendl;
if (pmissing)
*pmissing = soid;
put_snapset_context(ssc);
- return -ENOENT;
+ if (is_degraded_or_backfilling_object(soid)) {
+ dout(20) << __func__ << " clone is degraded or backfilling " << soid << dendl;
+ return -EAGAIN;
+ } else {
+ dout(20) << __func__ << " missing clone " << soid << dendl;
+ return -ENOENT;
+ }
}
if (!obc->ssc) {
_register_snapset_context(ssc);
if (bv.length()) {
bufferlist::iterator bvp = bv.begin();
- ssc->snapset.decode(bvp);
+ try {
+ ssc->snapset.decode(bvp);
+ } catch (buffer::error& e) {
+ dout(0) << __func__ << " Can't decode snapset: " << e << dendl;
+ return NULL;
+ }
ssc->exists = true;
} else {
ssc->exists = false;
return PULL_NONE;
}
+ if (missing_loc.is_deleted(soid)) {
+ start_recovery_op(soid);
+ assert(!recovering.count(soid));
+ recovering.insert(make_pair(soid, ObjectContextRef()));
+ epoch_t cur_epoch = get_osdmap()->get_epoch();
+ remove_missing_object(soid, v, new FunctionContext(
+ [=](int) {
+ lock();
+ if (!pg_has_reset_since(cur_epoch)) {
+ bool object_missing = false;
+ for (const auto& shard : actingbackfill) {
+ if (shard == pg_whoami)
+ continue;
+ if (peer_missing[shard].is_missing(soid)) {
+ dout(20) << __func__ << ": soid " << soid << " needs to be deleted from replica " << shard << dendl;
+ object_missing = true;
+ break;
+ }
+ }
+ if (!object_missing) {
+ object_stat_sum_t stat_diff;
+ stat_diff.num_objects_recovered = 1;
+ on_global_recover(soid, stat_diff, true);
+ } else {
+ auto recovery_handle = pgbackend->open_recovery_op();
+ pgbackend->recover_delete_object(soid, v, recovery_handle);
+ pgbackend->run_recovery_op(recovery_handle, priority);
+ }
+ }
+ unlock();
+ }));
+ return PULL_YES;
+ }
+
// is this a snapped object? if so, consult the snapset.. we may not need the entire object!
ObjectContextRef obc;
ObjectContextRef head_obc;
start_recovery_op(soid);
assert(!recovering.count(soid));
recovering.insert(make_pair(soid, obc));
- pgbackend->recover_object(
+ int r = pgbackend->recover_object(
soid,
v,
head_obc,
obc,
h);
+ // This is only a pull which shouldn't return an error
+ assert(r >= 0);
return PULL_YES;
}
osd->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch());
}
+void PrimaryLogPG::remove_missing_object(const hobject_t &soid,
+ eversion_t v, Context *on_complete)
+{
+ dout(20) << __func__ << " " << soid << " " << v << dendl;
+ assert(on_complete != nullptr);
+ // delete locally
+ ObjectStore::Transaction t;
+ remove_snap_mapped_object(t, soid);
+
+ ObjectRecoveryInfo recovery_info;
+ recovery_info.soid = soid;
+ recovery_info.version = v;
+
+ epoch_t cur_epoch = get_osdmap()->get_epoch();
+ t.register_on_complete(new FunctionContext(
+ [=](int) {
+ lock();
+ if (!pg_has_reset_since(cur_epoch)) {
+ ObjectStore::Transaction t2;
+ on_local_recover(soid, recovery_info, ObjectContextRef(), true, &t2);
+ t2.register_on_complete(on_complete);
+ int r = osd->store->queue_transaction(osr.get(), std::move(t2), nullptr);
+ assert(r == 0);
+ unlock();
+ } else {
+ unlock();
+ on_complete->complete(-EAGAIN);
+ }
+ }));
+ int r = osd->store->queue_transaction(osr.get(), std::move(t), nullptr);
+ assert(r == 0);
+}
void PrimaryLogPG::finish_degraded_object(const hobject_t& oid)
{
dout(10) << "finish_degraded_object " << oid << dendl;
- ObjectContextRef obc(object_contexts.lookup(oid));
if (callbacks_for_degraded_object.count(oid)) {
list<Context*> contexts;
contexts.swap(callbacks_for_degraded_object[oid]);
void PrimaryLogPG::_applied_recovered_object(ObjectContextRef obc)
{
lock();
- dout(10) << "_applied_recovered_object " << *obc << dendl;
-
+ dout(20) << __func__ << dendl;
+ if (obc) {
+ dout(20) << "obc = " << *obc << dendl;
+ }
assert(active_pushes >= 1);
--active_pushes;
// requeue an active chunky scrub waiting on recovery ops
if (!deleting && active_pushes == 0
&& scrubber.is_chunky_scrub_active()) {
- requeue_scrub();
+ if (ops_blocked_by_scrub()) {
+ requeue_scrub(true);
+ } else {
+ requeue_scrub(false);
+ }
}
-
unlock();
}
void PrimaryLogPG::_applied_recovered_object_replica()
{
lock();
- dout(10) << "_applied_recovered_object_replica" << dendl;
-
+ dout(20) << __func__ << dendl;
assert(active_pushes >= 1);
--active_pushes;
PGQueueable(scrubber.active_rep_scrub, get_osdmap()->get_epoch()));
scrubber.active_rep_scrub = OpRequestRef();
}
-
unlock();
}
}
}
+void PrimaryLogPG::primary_failed(const hobject_t &soid)
+{
+ list<pg_shard_t> fl = { pg_whoami };
+ failed_push(fl, soid);
+}
+
void PrimaryLogPG::failed_push(const list<pg_shard_t> &from, const hobject_t &soid)
{
dout(20) << __func__ << ": " << soid << dendl;
if (*i == get_primary()) continue;
pg_shard_t peer = *i;
if (!peer_missing[peer].is_missing(oid)) {
- assert(is_backfill_targets(peer));
continue;
}
eversion_t h = peer_missing[peer].get_items().at(oid).have;
unlock();
});
- if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+ if (get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
t.register_on_commit(complete);
} else {
/* Hack to work around the fact that ReplicatedBackend sends
ceph_tid_t tid)
{
dout(3) << __func__ << " " << pg_log_entry_t::get_op_name(what) << dendl;
+ list<hobject_t> oids;
dout(30) << __func__ << ": log before:\n";
pg_log.get_log().print(*_dout);
*_dout << dendl;
- mempool::osd::list<pg_log_entry_t> log_entries;
+ mempool::osd_pglog::list<pg_log_entry_t> log_entries;
utime_t mtime = ceph_clock_now();
map<hobject_t, pg_missing_item>::const_iterator m =
{
pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, v, m->second.need,
0, osd_reqid_t(), mtime, 0);
- if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+ if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
if (pool.info.require_rollback()) {
e.mod_desc.try_rmobject(v.version);
} else {
} // otherwise, just do what we used to do
dout(10) << e << dendl;
log_entries.push_back(e);
+ oids.push_back(oid);
+
+ // If context found mark object as deleted in case
+ // of racing with new creation. This can happen if
+ // object lost and EIO at primary.
+ obc = object_contexts.lookup(oid);
+ if (obc)
+ obc->obs.exists = false;
++v.version;
++m;
log_entries,
std::move(manager),
boost::optional<std::function<void(void)> >(
- [=]() {
- requeue_ops(waiting_for_all_missing);
- waiting_for_all_missing.clear();
- for (auto& p : waiting_for_unreadable_object) {
- release_backoffs(p.first);
+ [this, oids, con, num_unfound, tid]() {
+ if (perform_deletes_during_peering()) {
+ for (auto oid : oids) {
+ // clear old locations - merge_new_log_entries will have
+ // handled rebuilding missing_loc for each of these
+ // objects if we have the RECOVERY_DELETES flag
+ missing_loc.recovered(oid);
+ }
+ }
+
+ if (is_recovery_unfound()) {
+ queue_peering_event(
+ CephPeeringEvtRef(
+ std::make_shared<CephPeeringEvt>(
+ get_osdmap()->get_epoch(),
+ get_osdmap()->get_epoch(),
+ DoRecovery())));
+ } else if (is_backfill_unfound()) {
+ queue_peering_event(
+ CephPeeringEvtRef(
+ std::make_shared<CephPeeringEvt>(
+ get_osdmap()->get_epoch(),
+ get_osdmap()->get_epoch(),
+ RequestBackfill())));
+ } else {
+ queue_recovery();
}
- requeue_object_waiters(waiting_for_unreadable_object);
- queue_recovery();
stringstream ss;
ss << "pg has " << num_unfound
assert(flushes_in_progress > 0);
flushes_in_progress--;
if (flushes_in_progress == 0) {
- requeue_ops(waiting_for_peered);
+ requeue_ops(waiting_for_flush);
}
if (!is_peered() || !is_primary()) {
pair<hobject_t, ObjectContextRef> i;
on_shutdown();
}
+void PrimaryLogPG::clear_async_reads()
+{
+ dout(10) << __func__ << dendl;
+ for(auto& i : in_progress_async_reads) {
+ dout(10) << "clear ctx: "
+ << "OpRequestRef " << i.first
+ << " OpContext " << i.second
+ << dendl;
+ close_op_ctx(i.second);
+ }
+}
+
void PrimaryLogPG::on_shutdown()
{
dout(10) << "on_shutdown" << dendl;
// handles queue races
deleting = true;
+ if (recovery_queued) {
+ recovery_queued = false;
+ osd->clear_queued_recovery(this);
+ }
+
clear_scrub_reserved();
scrub_clear_state();
cancel_proxy_ops(false);
apply_and_flush_repops(false);
cancel_log_updates();
+ // we must remove PGRefs, so do this this prior to release_backoffs() callers
+ clear_backoffs();
+ // clean up snap trim references
+ snap_trimmer_machine.process_event(Reset());
pgbackend->on_change();
context_registry_on_change();
object_contexts.clear();
+ clear_async_reads();
+
osd->remote_reserver.cancel_reservation(info.pgid);
osd->local_reserver.cancel_reservation(info.pgid);
RequestBackfill())));
} else {
dout(10) << "activate all replicas clean, no recovery" << dendl;
+ eio_errors_to_process = false;
queue_peering_event(
CephPeeringEvtRef(
std::make_shared<CephPeeringEvt>(
void PrimaryLogPG::_on_new_interval()
{
+ dout(20) << __func__ << " checking missing set deletes flag. missing = " << pg_log.get_missing() << dendl;
+ if (!pg_log.get_missing().may_include_deletes &&
+ get_osdmap()->test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
+ pg_log.rebuild_missing_set_with_deletes(osd->store, coll, info);
+ }
+ assert(pg_log.get_missing().may_include_deletes == get_osdmap()->test_flag(CEPH_OSDMAP_RECOVERY_DELETES));
}
void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
// requeue everything in the reverse order they should be
// reexamined.
requeue_ops(waiting_for_peered);
+ requeue_ops(waiting_for_flush);
requeue_ops(waiting_for_active);
clear_scrub_reserved();
if (is_primary()) {
requeue_ops(waiting_for_cache_not_full);
- requeue_ops(waiting_for_all_missing);
} else {
waiting_for_cache_not_full.clear();
- waiting_for_all_missing.clear();
}
objects_blocked_on_cache_full.clear();
assert(is_primary());
if (!state_test(PG_STATE_RECOVERING) &&
- !state_test(PG_STATE_BACKFILL)) {
+ !state_test(PG_STATE_BACKFILLING)) {
/* TODO: I think this case is broken and will make do_recovery()
* unhappy since we're returning false */
dout(10) << "recovery raced and were queued twice, ignoring!" << dendl;
return false;
}
- const pg_missing_t &missing = pg_log.get_missing();
+ const auto &missing = pg_log.get_missing();
unsigned int num_missing = missing.num_missing();
uint64_t num_unfound = get_num_unfound();
bool deferred_backfill = false;
if (recovering.empty() &&
- state_test(PG_STATE_BACKFILL) &&
+ state_test(PG_STATE_BACKFILLING) &&
!backfill_targets.empty() && started < max &&
missing.num_missing() == 0 &&
waiting_on_backfill.empty()) {
if (missing.num_missing() > 0) {
// this shouldn't happen!
- osd->clog->error() << info.pgid << " recovery ending with " << missing.num_missing()
- << ": " << missing.get_items();
+ osd->clog->error() << info.pgid << " Unexpected Error: recovery ending with "
+ << missing.num_missing() << ": " << missing.get_items();
return work_in_progress;
}
if (needs_recovery()) {
// this shouldn't happen!
// We already checked num_missing() so we must have missing replicas
- osd->clog->error() << info.pgid << " recovery ending with missing replicas";
+ osd->clog->error() << info.pgid
+ << " Unexpected Error: recovery ending with missing replicas";
return work_in_progress;
}
if (state_test(PG_STATE_RECOVERING)) {
state_clear(PG_STATE_RECOVERING);
+ state_clear(PG_STATE_FORCED_RECOVERY);
if (needs_backfill()) {
dout(10) << "recovery done, queuing backfill" << dendl;
queue_peering_event(
RequestBackfill())));
} else {
dout(10) << "recovery done, no backfill" << dendl;
+ eio_errors_to_process = false;
+ state_clear(PG_STATE_FORCED_BACKFILL);
queue_peering_event(
CephPeeringEvtRef(
std::make_shared<CephPeeringEvt>(
AllReplicasRecovered())));
}
} else { // backfilling
- state_clear(PG_STATE_BACKFILL);
+ state_clear(PG_STATE_BACKFILLING);
+ state_clear(PG_STATE_FORCED_BACKFILL);
+ state_clear(PG_STATE_FORCED_RECOVERY);
dout(10) << "recovery done, backfill done" << dendl;
+ eio_errors_to_process = false;
queue_peering_event(
CephPeeringEvtRef(
std::make_shared<CephPeeringEvt>(
{
assert(is_primary());
- const pg_missing_t &missing = pg_log.get_missing();
+ const auto &missing = pg_log.get_missing();
dout(10) << "recover_primary recovering " << recovering.size()
<< " in pg" << dendl;
if (pg_log.get_log().objects.count(p->second)) {
latest = pg_log.get_log().objects.find(p->second)->second;
- assert(latest->is_update());
+ assert(latest->is_update() || latest->is_delete());
soid = latest->soid;
} else {
latest = 0;
const pg_missing_item& item = missing.get_items().find(p->second)->second;
++p;
- hobject_t head = soid;
- head.snap = CEPH_NOSNAP;
+ hobject_t head = soid.get_head();
eversion_t need = item.need;
return started;
}
+bool PrimaryLogPG::primary_error(
+ const hobject_t& soid, eversion_t v)
+{
+ pg_log.missing_add(soid, v, eversion_t());
+ pg_log.set_last_requested(0);
+ missing_loc.remove_location(soid, pg_whoami);
+ bool uhoh = true;
+ assert(!actingbackfill.empty());
+ for (set<pg_shard_t>::iterator i = actingbackfill.begin();
+ i != actingbackfill.end();
+ ++i) {
+ if (*i == get_primary()) continue;
+ pg_shard_t peer = *i;
+ if (!peer_missing[peer].is_missing(soid, v)) {
+ missing_loc.add_location(soid, peer);
+ dout(10) << info.pgid << " unexpectedly missing " << soid << " v" << v
+ << ", there should be a copy on shard " << peer << dendl;
+ uhoh = false;
+ }
+ }
+ if (uhoh)
+ osd->clog->error() << info.pgid << " missing primary copy of " << soid << ", unfound";
+ else
+ osd->clog->error() << info.pgid << " missing primary copy of " << soid
+ << ", will try copies on " << missing_loc.get_locations(soid);
+ return uhoh;
+}
+
+int PrimaryLogPG::prep_object_replica_deletes(
+ const hobject_t& soid, eversion_t v,
+ PGBackend::RecoveryHandle *h)
+{
+ assert(is_primary());
+ dout(10) << __func__ << ": on " << soid << dendl;
+
+ start_recovery_op(soid);
+ assert(!recovering.count(soid));
+ recovering.insert(make_pair(soid, ObjectContextRef()));
+
+ pgbackend->recover_delete_object(soid, v, h);
+ return 1;
+}
+
int PrimaryLogPG::prep_object_replica_pushes(
const hobject_t& soid, eversion_t v,
PGBackend::RecoveryHandle *h)
// NOTE: we know we will get a valid oloc off of disk here.
ObjectContextRef obc = get_object_context(soid, false);
if (!obc) {
- pg_log.missing_add(soid, v, eversion_t());
- missing_loc.remove_location(soid, pg_whoami);
- bool uhoh = true;
- assert(!actingbackfill.empty());
- for (set<pg_shard_t>::iterator i = actingbackfill.begin();
- i != actingbackfill.end();
- ++i) {
- if (*i == get_primary()) continue;
- pg_shard_t peer = *i;
- if (!peer_missing[peer].is_missing(soid, v)) {
- missing_loc.add_location(soid, peer);
- dout(10) << info.pgid << " unexpectedly missing " << soid << " v" << v
- << ", there should be a copy on shard " << peer << dendl;
- uhoh = false;
- }
- }
- if (uhoh)
- osd->clog->error() << info.pgid << " missing primary copy of " << soid << ", unfound";
- else
- osd->clog->error() << info.pgid << " missing primary copy of " << soid
- << ", will try copies on " << missing_loc.get_locations(soid);
+ primary_error(soid, v);
return 0;
}
* In almost all cases, therefore, this lock should be uncontended.
*/
obc->ondisk_read_lock();
- pgbackend->recover_object(
+ int r = pgbackend->recover_object(
soid,
v,
ObjectContextRef(),
obc, // has snapset context
h);
obc->ondisk_read_unlock();
+ if (r < 0) {
+ dout(0) << __func__ << " Error " << r << " on oid " << soid << dendl;
+ primary_failed(soid);
+ primary_error(soid, v);
+ return 0;
+ }
return 1;
}
handle.reset_tp_timeout();
const hobject_t soid(p->second);
+ if (missing_loc.is_unfound(soid)) {
+ dout(10) << __func__ << ": " << soid << " still unfound" << dendl;
+ continue;
+ }
+
if (soid > pi->second.last_backfill) {
if (!recovering.count(soid)) {
+ derr << __func__ << ": object " << soid << " last_backfill " << pi->second.last_backfill << dendl;
derr << __func__ << ": object added to missing set for backfill, but "
<< "is not in recovering, error!" << dendl;
ceph_abort();
continue;
}
- if (missing_loc.is_unfound(soid)) {
- dout(10) << __func__ << ": " << soid << " still unfound" << dendl;
+ if (missing_loc.is_deleted(soid)) {
+ dout(10) << __func__ << ": " << soid << " is a delete, removing" << dendl;
+ map<hobject_t,pg_missing_item>::const_iterator r = m.get_items().find(soid);
+ started += prep_object_replica_deletes(soid, r->second.need, h);
continue;
}
update_range(&backfill_info, handle);
unsigned ops = 0;
- vector<boost::tuple<hobject_t, eversion_t,
- ObjectContextRef, vector<pg_shard_t> > > to_push;
vector<boost::tuple<hobject_t, eversion_t, pg_shard_t> > to_remove;
set<hobject_t> add_to_stat;
}
backfill_info.trim_to(last_backfill_started);
+ PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
while (ops < max) {
if (backfill_info.begin <= earliest_peer_backfill() &&
!backfill_info.extends_to_end() && backfill_info.empty()) {
vector<pg_shard_t> all_push = need_ver_targs;
all_push.insert(all_push.end(), missing_targs.begin(), missing_targs.end());
- to_push.push_back(
- boost::tuple<hobject_t, eversion_t, ObjectContextRef, vector<pg_shard_t> >
- (backfill_info.begin, obj_v, obc, all_push));
- // Count all simultaneous pushes of the same object as a single op
+ handle.reset_tp_timeout();
+ int r = prep_backfill_object_push(backfill_info.begin, obj_v, obc, all_push, h);
+ if (r < 0) {
+ *work_started = true;
+ dout(0) << __func__ << " Error " << r << " trying to backfill " << backfill_info.begin << dendl;
+ break;
+ }
ops++;
} else {
*work_started = true;
}
}
- PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
- for (unsigned i = 0; i < to_push.size(); ++i) {
- handle.reset_tp_timeout();
- prep_backfill_object_push(to_push[i].get<0>(), to_push[i].get<1>(),
- to_push[i].get<2>(), to_push[i].get<3>(), h);
- }
pgbackend->run_recovery_op(h, get_recovery_op_priority());
dout(5) << "backfill_pos is " << backfill_pos << dendl;
return ops;
}
-void PrimaryLogPG::prep_backfill_object_push(
+int PrimaryLogPG::prep_backfill_object_push(
hobject_t oid, eversion_t v,
ObjectContextRef obc,
vector<pg_shard_t> peers,
PGBackend::RecoveryHandle *h)
{
- dout(10) << "push_backfill_object " << oid << " v " << v << " to peers " << peers << dendl;
+ dout(10) << __func__ << " " << oid << " v " << v << " to peers " << peers << dendl;
assert(!peers.empty());
backfills_in_flight.insert(oid);
for (unsigned int i = 0 ; i < peers.size(); ++i) {
map<pg_shard_t, pg_missing_t>::iterator bpm = peer_missing.find(peers[i]);
assert(bpm != peer_missing.end());
- bpm->second.add(oid, eversion_t(), eversion_t());
+ bpm->second.add(oid, eversion_t(), eversion_t(), false);
}
assert(!recovering.count(oid));
// We need to take the read_lock here in order to flush in-progress writes
obc->ondisk_read_lock();
- pgbackend->recover_object(
+ int r = pgbackend->recover_object(
oid,
v,
ObjectContextRef(),
obc,
h);
obc->ondisk_read_unlock();
+ if (r < 0) {
+ dout(0) << __func__ << " Error " << r << " on oid " << oid << dendl;
+ primary_failed(oid);
+ primary_error(oid, v);
+ backfills_in_flight.erase(oid);
+ missing_loc.add_missing(oid, v, eversion_t());
+ }
+ return r;
}
void PrimaryLogPG::update_range(
continue;
did.insert(p->soid);
- if (p->is_delete()) {
+ if (p->is_delete() && !is_missing_object(p->soid)) {
dout(10) << " checking " << p->soid
<< " at " << p->version << dendl;
struct stat st;
is_active()) {
if (op)
requeue_op(op);
+ requeue_ops(waiting_for_flush);
requeue_ops(waiting_for_active);
requeue_ops(waiting_for_scrub);
requeue_ops(waiting_for_cache_not_full);
if (!allow_incomplete_clones) {
next_clone.snap = **curclone;
clog->error() << mode << " " << pgid << " " << head.get()
- << " expected clone " << next_clone;
+ << " expected clone " << next_clone << " " << missing
+ << " missing";
++scrubber.shallow_errors;
e.set_clone_missing(next_clone.snap);
}
<< " snapset.head_exists=false, but head exists";
++scrubber.shallow_errors;
head_error.set_head_mismatch();
+ // Fix head_exists locally so is_legacy() returns correctly
+ snapset->head_exists = true;
}
if (soid.is_snapdir() && snapset->head_exists) {
osd->clog->error() << mode << " " << info.pgid << " " << soid
<< " snapset.head_exists=true, but snapdir exists";
++scrubber.shallow_errors;
head_error.set_head_mismatch();
+ // For symmetry fix this too, but probably doesn't matter
+ snapset->head_exists = false;
}
- if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+ if (get_osdmap()->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
if (soid.is_snapdir()) {
dout(10) << " will move snapset to head from " << soid << dendl;
snapset_to_repair[soid.get_head()] = *snapset;
ObjectContextRef obc = get_object_context(p->first, false);
if (!obc) {
osd->clog->error() << info.pgid << " " << mode
- << " cannot get object context for "
+ << " cannot get object context for object "
<< p->first;
continue;
} else if (obc->obs.oi.soid != p->first) {
ObjectContextRef obc = get_object_context(p.first, true);
if (!obc) {
osd->clog->error() << info.pgid << " " << mode
- << " cannot get object context for "
+ << " cannot get object context for object "
<< p.first;
continue;
} else if (obc->obs.oi.soid != p.first) {
publish_stats_to_osd();
share_pg_info();
}
+ // Clear object context cache to get repair information
+ if (repair)
+ object_contexts.clear();
}
bool PrimaryLogPG::check_osdmap_full(const set<pg_shard_t> &missing_on)
return osd->check_osdmap_full(missing_on);
}
+int PrimaryLogPG::rep_repair_primary_object(const hobject_t& soid, OpRequestRef op)
+{
+ // Only supports replicated pools
+ assert(!pool.info.require_rollback());
+ assert(is_primary());
+
+ dout(10) << __func__ << " " << soid
+ << " peers osd.{" << actingbackfill << "}" << dendl;
+
+ if (!is_clean()) {
+ block_for_clean(soid, op);
+ return -EAGAIN;
+ }
+
+ assert(!pg_log.get_missing().is_missing(soid));
+ bufferlist bv;
+ object_info_t oi;
+ eversion_t v;
+ int r = get_pgbackend()->objects_get_attr(soid, OI_ATTR, &bv);
+ if (r < 0) {
+ // Leave v and try to repair without a version, getting attr failed
+ dout(0) << __func__ << ": Need version of replica, objects_get_attr failed: "
+ << soid << " error=" << r << dendl;
+ } else try {
+ bufferlist::iterator bliter = bv.begin();
+ ::decode(oi, bliter);
+ v = oi.version;
+ } catch (...) {
+ // Leave v as default constructed. This will fail when sent to older OSDs, but
+ // not much worse than failing here.
+ dout(0) << __func__ << ": Need version of replica, bad object_info_t: " << soid << dendl;
+ }
+
+ missing_loc.add_missing(soid, v, eversion_t());
+ if (primary_error(soid, v)) {
+ dout(0) << __func__ << " No other replicas available for " << soid << dendl;
+ // XXX: If we knew that there is no down osd which could include this
+ // object, it would be nice if we could return EIO here.
+ // If a "never fail" flag was available, that could be used
+ // for rbd to NOT return EIO until object marked lost.
+
+ // Drop through to save this op in case an osd comes up with the object.
+ }
+
+ // Restart the op after object becomes readable again
+ waiting_for_unreadable_object[soid].push_back(op);
+ op->mark_delayed("waiting for missing object");
+
+ if (!eio_errors_to_process) {
+ eio_errors_to_process = true;
+ assert(is_clean());
+ queue_peering_event(
+ CephPeeringEvtRef(
+ std::make_shared<CephPeeringEvt>(
+ get_osdmap()->get_epoch(),
+ get_osdmap()->get_epoch(),
+ DoRecovery())));
+ } else {
+ // A prior error must have already cleared clean state and queued recovery
+ // or a map change has triggered re-peering.
+ // Not inlining the recovery by calling maybe_kick_recovery(soid);
+ dout(5) << __func__<< ": Read error on " << soid << ", but already seen errors" << dendl;
+ }
+
+ return -EAGAIN;
+}
+
/*---SnapTrimmer Logging---*/
#undef dout_prefix
#define dout_prefix *_dout << pg->gen_prefix()
}
if (pg->scrubber.active) {
ldout(pg->cct, 10) << " scrubbing, will requeue snap_trimmer after" << dendl;
- pg->scrubber.queue_snap_trim = true;
return transit< WaitScrub >();
} else {
return transit< Trimming >();
context< SnapTrimmer >().log_enter(state_name);
context< SnapTrimmer >().pg->osd->queue_for_snap_trim(pg);
pg->state_set(PG_STATE_SNAPTRIM);
+ pg->state_clear(PG_STATE_SNAPTRIM_ERROR);
pg->publish_stats_to_osd();
}
for (auto &&object: to_trim) {
// Get next
ldout(pg->cct, 10) << "AwaitAsyncWork react trimming " << object << dendl;
- OpContextUPtr ctx = pg->trim_object(in_flight.empty(), object);
- if (!ctx) {
- ldout(pg->cct, 10) << "could not get write lock on obj "
- << object << dendl;
- if (in_flight.empty()) {
+ OpContextUPtr ctx;
+ int error = pg->trim_object(in_flight.empty(), object, &ctx);
+ if (error) {
+ if (error == -ENOLCK) {
+ ldout(pg->cct, 10) << "could not get write lock on obj "
+ << object << dendl;
+ } else {
+ pg->state_set(PG_STATE_SNAPTRIM_ERROR);
+ ldout(pg->cct, 10) << "Snaptrim error=" << error << dendl;
+ }
+ if (!in_flight.empty()) {
+ ldout(pg->cct, 10) << "letting the ones we already started finish" << dendl;
+ return transit< WaitRepops >();
+ }
+ if (error == -ENOLCK) {
ldout(pg->cct, 10) << "waiting for it to clear"
<< dendl;
return transit< WaitRWLock >();
-
} else {
- ldout(pg->cct, 10) << "letting the ones we already started finish" << dendl;
- return transit< WaitRepops >();
+ return transit< NotTrimming >();
}
}
[pg, object, &in_flight]() {
assert(in_flight.find(object) != in_flight.end());
in_flight.erase(object);
- if (in_flight.empty())
- pg->snap_trimmer_machine.process_event(RepopsComplete());
+ if (in_flight.empty()) {
+ if (pg->state_test(PG_STATE_SNAPTRIM_ERROR)) {
+ pg->snap_trimmer_machine.process_event(Reset());
+ } else {
+ pg->snap_trimmer_machine.process_event(RepopsComplete());
+ }
+ }
});
pg->simple_opc_submit(std::move(ctx));
int PrimaryLogPG::getattrs_maybe_cache(
ObjectContextRef obc,
- map<string, bufferlist> *out,
- bool user_only)
+ map<string, bufferlist> *out)
{
int r = 0;
+ assert(out);
if (pool.info.require_rollback()) {
- if (out)
- *out = obc->attr_cache;
+ *out = obc->attr_cache;
} else {
r = pgbackend->objects_get_attrs(obc->obs.oi.soid, out);
}
- if (out && user_only) {
- map<string, bufferlist> tmp;
- for (map<string, bufferlist>::iterator i = out->begin();
- i != out->end();
- ++i) {
- if (i->first.size() > 1 && i->first[0] == '_')
- tmp[i->first.substr(1, i->first.size())].claim(i->second);
- }
- tmp.swap(*out);
+ map<string, bufferlist> tmp;
+ for (map<string, bufferlist>::iterator i = out->begin();
+ i != out->end();
+ ++i) {
+ if (i->first.size() > 1 && i->first[0] == '_')
+ tmp[i->first.substr(1, i->first.size())].claim(i->second);
}
+ tmp.swap(*out);
return r;
}