PrimaryLogPG *pg,
PrimaryLogPG::OpContext *ctx) : pg(pg), opcontext(ctx) {}
void finish(int r) override {
- if (r < 0)
- opcontext->async_read_result = r;
opcontext->finish_read(pg);
}
~OnReadComplete() override {}
assert(pg->in_progress_async_reads.size());
assert(pg->in_progress_async_reads.front().second == this);
pg->in_progress_async_reads.pop_front();
- pg->complete_read_ctx(async_read_result, this);
+
+ // Restart the op context now that all reads have been
+ // completed. Read failures will be handled by the op finisher
+ pg->execute_ctx(this);
}
}
-class CopyFromCallback: public PrimaryLogPG::CopyCallback {
+class CopyFromCallback : public PrimaryLogPG::CopyCallback {
public:
- PrimaryLogPG::CopyResults *results;
- int retval;
+ PrimaryLogPG::CopyResults *results = nullptr;
PrimaryLogPG::OpContext *ctx;
- explicit CopyFromCallback(PrimaryLogPG::OpContext *ctx_)
- : results(NULL),
- retval(0),
- ctx(ctx_) {}
+ OSDOp &osd_op;
+
+ CopyFromCallback(PrimaryLogPG::OpContext *ctx, OSDOp &osd_op)
+ : ctx(ctx), osd_op(osd_op) {
+ }
~CopyFromCallback() override {}
void finish(PrimaryLogPG::CopyCallbackResults results_) override {
results = results_.get<1>();
int r = results_.get<0>();
- retval = r;
// for finish_copyfrom
ctx->user_at_version = results->user_version;
if (r >= 0) {
ctx->pg->execute_ctx(ctx);
- }
- ctx->copy_cb = NULL;
- if (r < 0) {
+ } else {
if (r != -ECANCELED) { // on cancel just toss it out; client resends
if (ctx->op)
ctx->pg->osd->reply_op_error(ctx->op, r);
uint64_t get_data_size() {
return results->object_size;
}
- int get_result() {
- return retval;
+};
+
+struct CopyFromFinisher : public PrimaryLogPG::OpFinisher {
+ CopyFromCallback *copy_from_callback;
+
+ CopyFromFinisher(CopyFromCallback *copy_from_callback)
+ : copy_from_callback(copy_from_callback) {
+ }
+
+ int execute() override {
+ // instance will be destructed after this method completes
+ copy_from_callback->ctx->pg->finish_copyfrom(copy_from_callback);
+ return 0;
}
};
const hobject_t &hoid,
const ObjectRecoveryInfo &_recovery_info,
ObjectContextRef obc,
+ bool is_delete,
ObjectStore::Transaction *t
)
{
ObjectRecoveryInfo recovery_info(_recovery_info);
clear_object_snap_mapping(t, hoid);
- if (recovery_info.soid.is_snap()) {
+ if (!is_delete && recovery_info.soid.is_snap()) {
OSDriver::OSTransaction _t(osdriver.get_transaction(t));
set<snapid_t> snaps;
dout(20) << " snapset " << recovery_info.ss
snaps,
&_t);
}
- if (pg_log.get_missing().is_missing(recovery_info.soid) &&
+ if (!is_delete && pg_log.get_missing().is_missing(recovery_info.soid) &&
pg_log.get_missing().get_items().find(recovery_info.soid)->second.need > recovery_info.version) {
assert(is_primary());
const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
recover_got(recovery_info.soid, recovery_info.version);
if (is_primary()) {
- assert(obc);
- obc->obs.exists = true;
- obc->ondisk_write_lock();
-
- bool got = obc->get_recovery_read();
- assert(got);
+ if (!is_delete) {
+ obc->obs.exists = true;
+ obc->ondisk_write_lock();
- assert(recovering.count(obc->obs.oi.soid));
- recovering[obc->obs.oi.soid] = obc;
- obc->obs.oi = recovery_info.oi; // may have been updated above
+ bool got = obc->get_recovery_read();
+ assert(got);
+ assert(recovering.count(obc->obs.oi.soid));
+ recovering[obc->obs.oi.soid] = obc;
+ obc->obs.oi = recovery_info.oi; // may have been updated above
+ t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
+ }
t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
- t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
publish_stats_to_osd();
assert(missing_loc.needs_recovery(hoid));
- missing_loc.add_location(hoid, pg_whoami);
+ if (!is_delete)
+ missing_loc.add_location(hoid, pg_whoami);
release_backoffs(hoid);
if (!is_unreadable_object(hoid)) {
auto unreadable_object_entry = waiting_for_unreadable_object.find(hoid);
void PrimaryLogPG::on_global_recover(
const hobject_t &soid,
- const object_stat_sum_t &stat_diff)
+ const object_stat_sum_t &stat_diff,
+ bool is_delete)
{
info.stats.stats.sum.add(stat_diff);
missing_loc.recovered(soid);
map<hobject_t, ObjectContextRef>::iterator i = recovering.find(soid);
assert(i != recovering.end());
- // recover missing won't have had an obc, but it gets filled in
- // during on_local_recover
- assert(i->second);
- list<OpRequestRef> requeue_list;
- i->second->drop_recovery_read(&requeue_list);
- requeue_ops(requeue_list);
+ if (!is_delete) {
+ // recover missing won't have had an obc, but it gets filled in
+ // during on_local_recover
+ assert(i->second);
+ list<OpRequestRef> requeue_list;
+ i->second->drop_recovery_read(&requeue_list);
+ requeue_ops(requeue_list);
+ }
backfills_in_flight.erase(soid);
PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
if (is_missing_object(soid)) {
recover_missing(soid, v, cct->_conf->osd_client_op_priority, h);
+ } else if (missing_loc.is_deleted(soid)) {
+ prep_object_replica_deletes(soid, v, h);
} else {
prep_object_replica_pushes(soid, v, h);
}
ConnectionRef con,
ceph_tid_t tid)
{
- const pg_missing_t &missing = pg_log.get_missing();
+ const auto &missing = pg_log.get_missing();
string prefix;
string format;
if (candidate.get_namespace() == cct->_conf->osd_hit_set_namespace)
continue;
+ if (missing_loc.is_deleted(candidate))
+ continue;
+
// skip wrong namespace
if (m->get_hobj().nspace != librados::all_nspaces &&
candidate.get_namespace() != m->get_hobj().nspace)
if (candidate.get_namespace() != m->get_hobj().nspace)
continue;
+ if (missing_loc.is_deleted(candidate))
+ continue;
+
if (filter && !pgls_filter(filter, candidate, filter_out))
continue;
}
}
- OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops, obc, this);
+ OpContext *ctx = new OpContext(op, m->get_reqid(), &m->ops, obc, this);
if (!obc->obs.exists)
ctx->snapset_obc = get_object_context(obc->obs.oi.soid.get_snapdir(), false);
case CEPH_OSD_OP_SYNC_READ:
case CEPH_OSD_OP_SPARSE_READ:
case CEPH_OSD_OP_CHECKSUM:
+ case CEPH_OSD_OP_CMPEXT:
op.flags = (op.flags | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL) &
~(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_FADVISE_NOCACHE);
}
osd->logger->inc(l_osd_tier_proxy_read);
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
- OpContext *ctx = new OpContext(op, m->get_reqid(), prdop->ops, this);
+ OpContext *ctx = new OpContext(op, m->get_reqid(), &prdop->ops, this);
ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
ctx->user_at_version = prdop->user_version;
ctx->data_off = prdop->data_offset;
dout(10) << __func__ << " Start proxy write for " << *m << dendl;
ProxyWriteOpRef pwop(std::make_shared<ProxyWriteOp>(op, soid, m->ops, m->get_reqid()));
- pwop->ctx = new OpContext(op, m->get_reqid(), pwop->ops, this);
+ pwop->ctx = new OpContext(op, m->get_reqid(), &pwop->ops, this);
pwop->mtime = m->get_mtime();
ObjectOperation obj_op;
ctx->at_version = get_next_version();
ctx->mtime = m->get_mtime();
- dout(10) << __func__ << " " << soid << " " << ctx->ops
+ dout(10) << __func__ << " " << soid << " " << *ctx->ops
<< " ov " << obc->obs.oi.version << " av " << ctx->at_version
<< " snapc " << ctx->snapc
<< " snapset " << obc->ssc->snapset
<< dendl;
} else {
- dout(10) << __func__ << " " << soid << " " << ctx->ops
+ dout(10) << __func__ << " " << soid << " " << *ctx->ops
<< " ov " << obc->obs.oi.version
<< dendl;
}
obc->ondisk_read_unlock();
}
- if (result == -EINPROGRESS) {
+ bool pending_async_reads = !ctx->pending_async_reads.empty();
+ if (result == -EINPROGRESS || pending_async_reads) {
// come back later.
+ if (pending_async_reads) {
+ in_progress_async_reads.push_back(make_pair(op, ctx));
+ ctx->start_async_reads(this);
+ }
return;
}
if (result >= 0)
do_osd_op_effects(ctx, m->get_connection());
- if (ctx->pending_async_reads.empty()) {
- complete_read_ctx(result, ctx);
- } else {
- in_progress_async_reads.push_back(make_pair(op, ctx));
- ctx->start_async_reads(this);
- }
-
+ complete_read_ctx(result, ctx);
return;
}
// save just what we need from ctx
MOSDOpReply *reply = ctx->reply;
ctx->reply = nullptr;
- reply->claim_op_out_data(ctx->ops);
- reply->get_header().data_off = ctx->data_off;
+ reply->claim_op_out_data(*ctx->ops);
+ reply->get_header().data_off = (ctx->data_off ? *ctx->data_off : 0);
close_op_ctx(ctx);
if (result == -ENOENT) {
repop->put();
}
+void PrimaryLogPG::close_op_ctx(OpContext *ctx) {
+ release_object_locks(ctx->lock_manager);
+
+ ctx->op_t.reset();
+
+ for (auto p = ctx->on_finish.begin(); p != ctx->on_finish.end();
+ ctx->on_finish.erase(p++)) {
+ (*p)();
+ }
+ delete ctx;
+}
+
void PrimaryLogPG::reply_ctx(OpContext *ctx, int r)
{
if (ctx->op)
} else {
auto p = snapset.clone_snaps.find(coid.snap);
if (p == snapset.clone_snaps.end()) {
- osd->clog->error() << __func__ << " No clone_snaps in snapset " << snapset
- << " for " << coid << "\n";
+ osd->clog->error() << "No clone_snaps in snapset " << snapset
+ << " for object " << coid << "\n";
return -ENOENT;
}
old_snaps.insert(snapset.clone_snaps[coid.snap].begin(),
snapset.clone_snaps[coid.snap].end());
}
if (old_snaps.empty()) {
- osd->clog->error() << __func__ << " No object info snaps for " << coid;
+ osd->clog->error() << "No object info snaps for object " << coid;
return -ENOENT;
}
dout(10) << coid << " old_snaps " << old_snaps
<< " old snapset " << snapset << dendl;
if (snapset.seq == 0) {
- osd->clog->error() << __func__ << " No snapset.seq for " << coid;
+ osd->clog->error() << "No snapset.seq for object " << coid;
return -ENOENT;
}
if (new_snaps.empty()) {
p = std::find(snapset.clones.begin(), snapset.clones.end(), coid.snap);
if (p == snapset.clones.end()) {
- osd->clog->error() << __func__ << " Snap " << coid.snap << " not in clones";
+ osd->clog->error() << "Snap " << coid.snap << " not in clones";
return -ENOENT;
}
}
}
}
-int PrimaryLogPG::do_extent_cmp(OpContext *ctx, OSDOp& osd_op)
-{
- ceph_osd_op& op = osd_op.op;
- vector<OSDOp> read_ops(1);
- OSDOp& read_op = read_ops[0];
- int result = 0;
-
- read_op.op.op = CEPH_OSD_OP_SYNC_READ;
- read_op.op.extent.offset = op.extent.offset;
- read_op.op.extent.length = op.extent.length;
- read_op.op.extent.truncate_seq = op.extent.truncate_seq;
- read_op.op.extent.truncate_size = op.extent.truncate_size;
-
- result = do_osd_ops(ctx, read_ops);
- if (result < 0) {
- derr << "do_extent_cmp do_osd_ops failed " << result << dendl;
- return result;
- }
-
- if (read_op.outdata.length() != osd_op.indata.length())
- return -EINVAL;
-
- for (uint64_t p = 0; p < osd_op.indata.length(); p++) {
- if (read_op.outdata[p] != osd_op.indata[p]) {
- return (-MAX_ERRNO - p);
- }
- }
-
- return result;
-}
-
int PrimaryLogPG::do_writesame(OpContext *ctx, OSDOp& osd_op)
{
ceph_osd_op& op = osd_op.op;
r(r), rval(rv), outdatap(blp), maybe_crc(mc),
size(size), osd(osd), soid(soid), flags(flags) {}
void finish(int len) override {
- *rval = len;
*r = len;
- if (len < 0)
+ if (len < 0) {
+ *rval = len;
return;
+ }
+ *rval = 0;
+
// whole object? can we verify the checksum?
if (maybe_crc && *r == size) {
uint32_t crc = outdatap->crc32c(-1);
};
struct ToSparseReadResult : public Context {
- bufferlist& data_bl;
+ int* result;
+ bufferlist* data_bl;
uint64_t data_offset;
- ceph_le64& len;
- ToSparseReadResult(bufferlist& bl, uint64_t offset, ceph_le64& len):
- data_bl(bl), data_offset(offset),len(len) {}
+ ceph_le64* len;
+ ToSparseReadResult(int* result, bufferlist* bl, uint64_t offset,
+ ceph_le64* len)
+ : result(result), data_bl(bl), data_offset(offset),len(len) {}
void finish(int r) override {
- if (r < 0) return;
- len = r;
+ if (r < 0) {
+ *result = r;
+ return;
+ }
+ *result = 0;
+ *len = r;
bufferlist outdata;
map<uint64_t, uint64_t> extents = {{data_offset, r}};
::encode(extents, outdata);
- ::encode_destructively(data_bl, outdata);
- data_bl.swap(outdata);
+ ::encode_destructively(*data_bl, outdata);
+ data_bl->swap(outdata);
}
};
}
}
+struct ReadFinisher : public PrimaryLogPG::OpFinisher {
+ OSDOp& osd_op;
+
+ ReadFinisher(OSDOp& osd_op) : osd_op(osd_op) {
+ }
+
+ int execute() override {
+ return osd_op.rval;
+ }
+};
+
struct C_ChecksumRead : public Context {
PrimaryLogPG *primary_log_pg;
OSDOp &osd_op;
&read_bl, maybe_crc, size,
osd, soid, flags)) {
}
+ ~C_ChecksumRead() override {
+ delete fill_extent_ctx;
+ }
void finish(int r) override {
fill_extent_ctx->complete(r);
+ fill_extent_ctx = nullptr;
if (osd_op.rval >= 0) {
bufferlist::iterator init_value_bl_it = init_value_bl.begin();
osd_op.rval = primary_log_pg->finish_checksum(osd_op, csum_type,
- &init_value_bl_it,
- read_bl);
+ &init_value_bl_it, read_bl);
}
}
};
int PrimaryLogPG::do_checksum(OpContext *ctx, OSDOp& osd_op,
- bufferlist::iterator *bl_it, bool *async_read)
+ bufferlist::iterator *bl_it)
{
dout(20) << __func__ << dendl;
auto checksum_ctx = new C_ChecksumRead(this, osd_op, csum_type,
std::move(init_value_bl), maybe_crc,
oi.size, osd, soid, op.flags);
+
ctx->pending_async_reads.push_back({
{op.checksum.offset, op.checksum.length, op.flags},
{&checksum_ctx->read_bl, checksum_ctx}});
dout(10) << __func__ << ": async_read noted for " << soid << dendl;
- *async_read = true;
- return 0;
+ ctx->op_finishers[ctx->current_osd_subop_num].reset(
+ new ReadFinisher(osd_op));
+ return -EINPROGRESS;
}
// sync read
- *async_read = false;
std::vector<OSDOp> read_ops(1);
auto& read_op = read_ops[0];
if (op.checksum.length > 0) {
return 0;
}
+struct C_ExtentCmpRead : public Context {
+ PrimaryLogPG *primary_log_pg;
+ OSDOp &osd_op;
+ ceph_le64 read_length;
+ bufferlist read_bl;
+ Context *fill_extent_ctx;
+
+ C_ExtentCmpRead(PrimaryLogPG *primary_log_pg, OSDOp &osd_op,
+ boost::optional<uint32_t> maybe_crc, uint64_t size,
+ OSDService *osd, hobject_t soid, __le32 flags)
+ : primary_log_pg(primary_log_pg), osd_op(osd_op),
+ fill_extent_ctx(new FillInVerifyExtent(&read_length, &osd_op.rval,
+ &read_bl, maybe_crc, size,
+ osd, soid, flags)) {
+ }
+ ~C_ExtentCmpRead() override {
+ delete fill_extent_ctx;
+ }
+
+ void finish(int r) override {
+ if (r == -ENOENT) {
+ osd_op.rval = 0;
+ read_bl.clear();
+ delete fill_extent_ctx;
+ } else {
+ fill_extent_ctx->complete(r);
+ }
+ fill_extent_ctx = nullptr;
+
+ if (osd_op.rval >= 0) {
+ osd_op.rval = primary_log_pg->finish_extent_cmp(osd_op, read_bl);
+ }
+ }
+};
+
+int PrimaryLogPG::do_extent_cmp(OpContext *ctx, OSDOp& osd_op)
+{
+ dout(20) << __func__ << dendl;
+ ceph_osd_op& op = osd_op.op;
+
+ if (!ctx->obs->exists || ctx->obs->oi.is_whiteout()) {
+ dout(20) << __func__ << " object DNE" << dendl;
+ return finish_extent_cmp(osd_op, {});
+ } else if (pool.info.require_rollback()) {
+ // If there is a data digest and it is possible we are reading
+ // entire object, pass the digest.
+ auto& oi = ctx->new_obs.oi;
+ boost::optional<uint32_t> maybe_crc;
+ if (oi.is_data_digest() && op.checksum.offset == 0 &&
+ op.checksum.length >= oi.size) {
+ maybe_crc = oi.data_digest;
+ }
+
+ // async read
+ auto& soid = oi.soid;
+ auto extent_cmp_ctx = new C_ExtentCmpRead(this, osd_op, maybe_crc, oi.size,
+ osd, soid, op.flags);
+ ctx->pending_async_reads.push_back({
+ {op.extent.offset, op.extent.length, op.flags},
+ {&extent_cmp_ctx->read_bl, extent_cmp_ctx}});
+
+ dout(10) << __func__ << ": async_read noted for " << soid << dendl;
+
+ ctx->op_finishers[ctx->current_osd_subop_num].reset(
+ new ReadFinisher(osd_op));
+ return -EINPROGRESS;
+ }
+
+ // sync read
+ vector<OSDOp> read_ops(1);
+ OSDOp& read_op = read_ops[0];
+
+ read_op.op.op = CEPH_OSD_OP_SYNC_READ;
+ read_op.op.extent.offset = op.extent.offset;
+ read_op.op.extent.length = op.extent.length;
+ read_op.op.extent.truncate_seq = op.extent.truncate_seq;
+ read_op.op.extent.truncate_size = op.extent.truncate_size;
+
+ int result = do_osd_ops(ctx, read_ops);
+ if (result < 0) {
+ derr << __func__ << " failed " << result << dendl;
+ return result;
+ }
+ return finish_extent_cmp(osd_op, read_op.outdata);
+}
+
+int PrimaryLogPG::finish_extent_cmp(OSDOp& osd_op, const bufferlist &read_bl)
+{
+ for (uint64_t idx = 0; idx < osd_op.indata.length(); ++idx) {
+ char read_byte = (idx < read_bl.length() ? read_bl[idx] : 0);
+ if (osd_op.indata[idx] != read_byte) {
+ return (-MAX_ERRNO - idx);
+ }
+ }
+
+ return 0;
+}
+
+int PrimaryLogPG::do_read(OpContext *ctx, OSDOp& osd_op) {
+ dout(20) << __func__ << dendl;
+ auto& op = osd_op.op;
+ auto& oi = ctx->new_obs.oi;
+ auto& soid = oi.soid;
+ __u32 seq = oi.truncate_seq;
+ uint64_t size = oi.size;
+ bool trimmed_read = false;
+
+ // are we beyond truncate_size?
+ if ( (seq < op.extent.truncate_seq) &&
+ (op.extent.offset + op.extent.length > op.extent.truncate_size) )
+ size = op.extent.truncate_size;
+
+ if (op.extent.length == 0) //length is zero mean read the whole object
+ op.extent.length = size;
+
+ if (op.extent.offset >= size) {
+ op.extent.length = 0;
+ trimmed_read = true;
+ } else if (op.extent.offset + op.extent.length > size) {
+ op.extent.length = size - op.extent.offset;
+ trimmed_read = true;
+ }
+
+ // read into a buffer
+ int result = 0;
+ if (trimmed_read && op.extent.length == 0) {
+ // read size was trimmed to zero and it is expected to do nothing
+ // a read operation of 0 bytes does *not* do nothing, this is why
+ // the trimmed_read boolean is needed
+ } else if (pool.info.require_rollback()) {
+ boost::optional<uint32_t> maybe_crc;
+ // If there is a data digest and it is possible we are reading
+ // entire object, pass the digest. FillInVerifyExtent will
+ // will check the oi.size again.
+ if (oi.is_data_digest() && op.extent.offset == 0 &&
+ op.extent.length >= oi.size)
+ maybe_crc = oi.data_digest;
+ ctx->pending_async_reads.push_back(
+ make_pair(
+ boost::make_tuple(op.extent.offset, op.extent.length, op.flags),
+ make_pair(&osd_op.outdata,
+ new FillInVerifyExtent(&op.extent.length, &osd_op.rval,
+ &osd_op.outdata, maybe_crc, oi.size,
+ osd, soid, op.flags))));
+ dout(10) << " async_read noted for " << soid << dendl;
+
+ ctx->op_finishers[ctx->current_osd_subop_num].reset(
+ new ReadFinisher(osd_op));
+ } else {
+ int r = pgbackend->objects_read_sync(
+ soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata);
+ if (r == -EIO) {
+ r = rep_repair_primary_object(soid, ctx->op);
+ }
+ if (r >= 0)
+ op.extent.length = r;
+ else {
+ result = r;
+ op.extent.length = 0;
+ }
+ dout(10) << " read got " << r << " / " << op.extent.length
+ << " bytes from obj " << soid << dendl;
+
+ // whole object? can we verify the checksum?
+ if (op.extent.length == oi.size && oi.is_data_digest()) {
+ uint32_t crc = osd_op.outdata.crc32c(-1);
+ if (oi.data_digest != crc) {
+ osd->clog->error() << info.pgid << std::hex
+ << " full-object read crc 0x" << crc
+ << " != expected 0x" << oi.data_digest
+ << std::dec << " on " << soid;
+ // FIXME fall back to replica or something?
+ result = -EIO;
+ }
+ }
+ }
+
+ // XXX the op.extent.length is the requested length for async read
+ // On error this length is changed to 0 after the error comes back.
+ ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
+ ctx->delta_stats.num_rd++;
+ return result;
+}
+
+int PrimaryLogPG::do_sparse_read(OpContext *ctx, OSDOp& osd_op) {
+ dout(20) << __func__ << dendl;
+ auto& op = osd_op.op;
+ auto& oi = ctx->new_obs.oi;
+ auto& soid = oi.soid;
+
+ if (op.extent.truncate_seq) {
+ dout(0) << "sparse_read does not support truncation sequence " << dendl;
+ return -EINVAL;
+ }
+
+ ++ctx->num_read;
+ if (pool.info.ec_pool()) {
+ // translate sparse read to a normal one if not supported
+ uint64_t offset = op.extent.offset;
+ uint64_t length = op.extent.length;
+ if (offset > oi.size) {
+ length = 0;
+ } else if (offset + length > oi.size) {
+ length = oi.size - offset;
+ }
+
+ if (length > 0) {
+ ctx->pending_async_reads.push_back(
+ make_pair(
+ boost::make_tuple(offset, length, op.flags),
+ make_pair(
+ &osd_op.outdata,
+ new ToSparseReadResult(&osd_op.rval, &osd_op.outdata, offset,
+ &op.extent.length))));
+ dout(10) << " async_read (was sparse_read) noted for " << soid << dendl;
+
+ ctx->op_finishers[ctx->current_osd_subop_num].reset(
+ new ReadFinisher(osd_op));
+ } else {
+ dout(10) << " sparse read ended up empty for " << soid << dendl;
+ map<uint64_t, uint64_t> extents;
+ ::encode(extents, osd_op.outdata);
+ }
+ } else {
+ // read into a buffer
+ map<uint64_t, uint64_t> m;
+ uint32_t total_read = 0;
+ int r = osd->store->fiemap(ch, ghobject_t(soid, ghobject_t::NO_GEN,
+ info.pgid.shard),
+ op.extent.offset, op.extent.length, m);
+ if (r < 0) {
+ return r;
+ }
+
+ map<uint64_t, uint64_t>::iterator miter;
+ bufferlist data_bl;
+ uint64_t last = op.extent.offset;
+ for (miter = m.begin(); miter != m.end(); ++miter) {
+ // verify hole?
+ if (cct->_conf->osd_verify_sparse_read_holes &&
+ last < miter->first) {
+ bufferlist t;
+ uint64_t len = miter->first - last;
+ r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
+ if (r == -EIO) {
+ r = rep_repair_primary_object(soid, ctx->op);
+ }
+ if (r < 0) {
+ osd->clog->error() << coll << " " << soid
+ << " sparse-read failed to read: "
+ << r;
+ } else if (!t.is_zero()) {
+ osd->clog->error() << coll << " " << soid
+ << " sparse-read found data in hole "
+ << last << "~" << len;
+ }
+ }
+
+ bufferlist tmpbl;
+ r = pgbackend->objects_read_sync(soid, miter->first, miter->second,
+ op.flags, &tmpbl);
+ if (r < 0) {
+ return r;
+ }
+
+ // this is usually happen when we get extent that exceeds the actual file
+ // size
+ if (r < (int)miter->second)
+ miter->second = r;
+ total_read += r;
+ dout(10) << "sparse-read " << miter->first << "@" << miter->second
+ << dendl;
+ data_bl.claim_append(tmpbl);
+ last = miter->first + r;
+ }
+
+ if (r < 0) {
+ return r;
+ }
+
+ // verify trailing hole?
+ if (cct->_conf->osd_verify_sparse_read_holes) {
+ uint64_t end = MIN(op.extent.offset + op.extent.length, oi.size);
+ if (last < end) {
+ bufferlist t;
+ uint64_t len = end - last;
+ r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
+ if (r < 0) {
+ osd->clog->error() << coll << " " << soid
+ << " sparse-read failed to read: " << r;
+ } else if (!t.is_zero()) {
+ osd->clog->error() << coll << " " << soid
+ << " sparse-read found data in hole "
+ << last << "~" << len;
+ }
+ }
+ }
+
+ // Why SPARSE_READ need checksum? In fact, librbd always use sparse-read.
+ // Maybe at first, there is no much whole objects. With continued use, more
+ // and more whole object exist. So from this point, for spare-read add
+ // checksum make sense.
+ if (total_read == oi.size && oi.is_data_digest()) {
+ uint32_t crc = data_bl.crc32c(-1);
+ if (oi.data_digest != crc) {
+ osd->clog->error() << info.pgid << std::hex
+ << " full-object read crc 0x" << crc
+ << " != expected 0x" << oi.data_digest
+ << std::dec << " on " << soid;
+ // FIXME fall back to replica or something?
+ return -EIO;
+ }
+ }
+
+ op.extent.length = total_read;
+
+ ::encode(m, osd_op.outdata); // re-encode since it might be modified
+ ::encode_destructively(data_bl, osd_op.outdata);
+
+ dout(10) << " sparse_read got " << total_read << " bytes from object "
+ << soid << dendl;
+ }
+
+ ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
+ ctx->delta_stats.num_rd++;
+ return 0;
+}
+
int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
{
int result = 0;
object_info_t& oi = obs.oi;
const hobject_t& soid = oi.soid;
- bool first_read = true;
-
PGTransaction* t = ctx->op_t.get();
dout(10) << "do_osd_op " << soid << " " << ops << dendl;
+ ctx->current_osd_subop_num = 0;
for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p, ctx->current_osd_subop_num++) {
OSDOp& osd_op = *p;
ceph_osd_op& op = osd_op.op;
+ OpFinisher* op_finisher = nullptr;
+ {
+ auto op_finisher_it = ctx->op_finishers.find(ctx->current_osd_subop_num);
+ if (op_finisher_it != ctx->op_finishers.end()) {
+ op_finisher = op_finisher_it->second.get();
+ }
+ }
+
// TODO: check endianness (__le32 vs uint32_t, etc.)
// The fields in ceph_osd_op are little-endian (according to the definition in rados.h),
// but the code in this function seems to treat them as native-endian. What should the
case CEPH_OSD_OP_CMPEXT:
++ctx->num_read;
- tracepoint(osd, do_osd_op_pre_extent_cmp, soid.oid.name.c_str(), soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
- result = do_extent_cmp(ctx, osd_op);
+ tracepoint(osd, do_osd_op_pre_extent_cmp, soid.oid.name.c_str(),
+ soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset,
+ op.extent.length, op.extent.truncate_size,
+ op.extent.truncate_seq);
+
+ if (op_finisher == nullptr) {
+ result = do_extent_cmp(ctx, osd_op);
+ } else {
+ result = op_finisher->execute();
+ }
break;
case CEPH_OSD_OP_SYNC_READ:
// fall through
case CEPH_OSD_OP_READ:
++ctx->num_read;
- {
- __u32 seq = oi.truncate_seq;
- uint64_t size = oi.size;
- tracepoint(osd, do_osd_op_pre_read, soid.oid.name.c_str(), soid.snap.val, size, seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
- bool trimmed_read = false;
- // are we beyond truncate_size?
- if ( (seq < op.extent.truncate_seq) &&
- (op.extent.offset + op.extent.length > op.extent.truncate_size) )
- size = op.extent.truncate_size;
-
- if (op.extent.length == 0) //length is zero mean read the whole object
- op.extent.length = size;
-
- if (op.extent.offset >= size) {
- op.extent.length = 0;
- trimmed_read = true;
- } else if (op.extent.offset + op.extent.length > size) {
- op.extent.length = size - op.extent.offset;
- trimmed_read = true;
- }
-
- // read into a buffer
- bool async = false;
- if (trimmed_read && op.extent.length == 0) {
- // read size was trimmed to zero and it is expected to do nothing
- // a read operation of 0 bytes does *not* do nothing, this is why
- // the trimmed_read boolean is needed
- } else if (pool.info.require_rollback()) {
- async = true;
- boost::optional<uint32_t> maybe_crc;
- // If there is a data digest and it is possible we are reading
- // entire object, pass the digest. FillInVerifyExtent will
- // will check the oi.size again.
- if (oi.is_data_digest() && op.extent.offset == 0 &&
- op.extent.length >= oi.size)
- maybe_crc = oi.data_digest;
- ctx->pending_async_reads.push_back(
- make_pair(
- boost::make_tuple(op.extent.offset, op.extent.length, op.flags),
- make_pair(&osd_op.outdata,
- new FillInVerifyExtent(&op.extent.length, &osd_op.rval,
- &osd_op.outdata, maybe_crc, oi.size, osd,
- soid, op.flags))));
- dout(10) << " async_read noted for " << soid << dendl;
- } else {
- int r = pgbackend->objects_read_sync(
- soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata);
- if (r == -EIO) {
- r = rep_repair_primary_object(soid, ctx->op);
- }
- if (r >= 0)
- op.extent.length = r;
- else {
- result = r;
- op.extent.length = 0;
- }
- dout(10) << " read got " << r << " / " << op.extent.length
- << " bytes from obj " << soid << dendl;
-
- // whole object? can we verify the checksum?
- if (op.extent.length == oi.size && oi.is_data_digest()) {
- uint32_t crc = osd_op.outdata.crc32c(-1);
- if (oi.data_digest != crc) {
- osd->clog->error() << info.pgid << std::hex
- << " full-object read crc 0x" << crc
- << " != expected 0x" << oi.data_digest
- << std::dec << " on " << soid;
- // FIXME fall back to replica or something?
- result = -EIO;
- }
- }
- }
- if (first_read) {
- first_read = false;
+ tracepoint(osd, do_osd_op_pre_read, soid.oid.name.c_str(),
+ soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset,
+ op.extent.length, op.extent.truncate_size,
+ op.extent.truncate_seq);
+ if (op_finisher == nullptr) {
+ if (!ctx->data_off) {
ctx->data_off = op.extent.offset;
}
- // XXX the op.extent.length is the requested length for async read
- // On error this length is changed to 0 after the error comes back.
- ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
- ctx->delta_stats.num_rd++;
-
- // Skip checking the result and just proceed to the next operation
- if (async)
- continue;
-
+ result = do_read(ctx, osd_op);
+ } else {
+ result = op_finisher->execute();
}
break;
op.checksum.offset, op.checksum.length,
op.checksum.chunk_size);
- bool async_read;
- result = do_checksum(ctx, osd_op, &bp, &async_read);
- if (result == 0 && async_read) {
- continue;
+ if (op_finisher == nullptr) {
+ result = do_checksum(ctx, osd_op, &bp);
+ } else {
+ result = op_finisher->execute();
}
}
break;
/* map extents */
case CEPH_OSD_OP_SPARSE_READ:
- tracepoint(osd, do_osd_op_pre_sparse_read, soid.oid.name.c_str(), soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
- if (op.extent.truncate_seq) {
- dout(0) << "sparse_read does not support truncation sequence " << dendl;
- result = -EINVAL;
- break;
- }
- ++ctx->num_read;
- if (pool.info.ec_pool()) {
- // translate sparse read to a normal one if not supported
- uint64_t offset = op.extent.offset;
- uint64_t length = op.extent.length;
- if (offset > oi.size) {
- length = 0;
- } else if (offset + length > oi.size) {
- length = oi.size - offset;
- }
- if (length > 0) {
- ctx->pending_async_reads.push_back(
- make_pair(
- boost::make_tuple(offset, length, op.flags),
- make_pair(
- &osd_op.outdata,
- new ToSparseReadResult(
- osd_op.outdata, offset,
- op.extent.length /* updated by the callback */))));
- dout(10) << " async_read (was sparse_read) noted for " << soid << dendl;
- } else {
- dout(10) << " sparse read ended up empty for " << soid << dendl;
- map<uint64_t, uint64_t> extents;
- ::encode(extents, osd_op.outdata);
- }
+ tracepoint(osd, do_osd_op_pre_sparse_read, soid.oid.name.c_str(),
+ soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset,
+ op.extent.length, op.extent.truncate_size,
+ op.extent.truncate_seq);
+ if (op_finisher == nullptr) {
+ result = do_sparse_read(ctx, osd_op);
} else {
- // read into a buffer
- map<uint64_t, uint64_t> m;
- uint32_t total_read = 0;
- int r = osd->store->fiemap(ch, ghobject_t(soid, ghobject_t::NO_GEN,
- info.pgid.shard),
- op.extent.offset, op.extent.length, m);
- if (r < 0) {
- result = r;
- break;
- }
- map<uint64_t, uint64_t>::iterator miter;
- bufferlist data_bl;
- uint64_t last = op.extent.offset;
- for (miter = m.begin(); miter != m.end(); ++miter) {
- // verify hole?
- if (cct->_conf->osd_verify_sparse_read_holes &&
- last < miter->first) {
- bufferlist t;
- uint64_t len = miter->first - last;
- r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
- if (r == -EIO) {
- r = rep_repair_primary_object(soid, ctx->op);
- }
- if (r < 0) {
- osd->clog->error() << coll << " " << soid
- << " sparse-read failed to read: "
- << r;
- } else if (!t.is_zero()) {
- osd->clog->error() << coll << " " << soid << " sparse-read found data in hole "
- << last << "~" << len;
- }
- }
-
- bufferlist tmpbl;
- r = pgbackend->objects_read_sync(soid, miter->first, miter->second, op.flags, &tmpbl);
- if (r < 0) {
- result = r;
- break;
- }
-
- if (r < (int)miter->second) /* this is usually happen when we get extent that exceeds the actual file size */
- miter->second = r;
- total_read += r;
- dout(10) << "sparse-read " << miter->first << "@" << miter->second << dendl;
- data_bl.claim_append(tmpbl);
- last = miter->first + r;
- }
-
- if (r < 0) {
- result = r;
- break;
- }
-
- // verify trailing hole?
- if (cct->_conf->osd_verify_sparse_read_holes) {
- uint64_t end = MIN(op.extent.offset + op.extent.length, oi.size);
- if (last < end) {
- bufferlist t;
- uint64_t len = end - last;
- r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
- if (r < 0) {
- osd->clog->error() << coll << " " << soid
- << " sparse-read failed to read: "
- << r;
- } else if (!t.is_zero()) {
- osd->clog->error() << coll << " " << soid << " sparse-read found data in hole "
- << last << "~" << len;
- }
- }
- }
-
- // Why SPARSE_READ need checksum? In fact, librbd always use sparse-read.
- // Maybe at first, there is no much whole objects. With continued use, more and more whole object exist.
- // So from this point, for spare-read add checksum make sense.
- if (total_read == oi.size && oi.is_data_digest()) {
- uint32_t crc = data_bl.crc32c(-1);
- if (oi.data_digest != crc) {
- osd->clog->error() << info.pgid << std::hex
- << " full-object read crc 0x" << crc
- << " != expected 0x" << oi.data_digest
- << std::dec << " on " << soid;
- // FIXME fall back to replica or something?
- result = -EIO;
- break;
- }
- }
-
- op.extent.length = total_read;
-
- ::encode(m, osd_op.outdata); // re-encode since it might be modified
- ::encode_destructively(data_bl, osd_op.outdata);
-
- dout(10) << " sparse_read got " << total_read << " bytes from object " << soid << dendl;
+ result = op_finisher->execute();
}
- ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
- ctx->delta_stats.num_rd++;
break;
case CEPH_OSD_OP_CALL:
case CEPH_OSD_OP_COPY_GET:
++ctx->num_read;
- tracepoint(osd, do_osd_op_pre_copy_get, soid.oid.name.c_str(), soid.snap.val);
- result = fill_in_copy_get(ctx, bp, osd_op, ctx->obc);
+ tracepoint(osd, do_osd_op_pre_copy_get, soid.oid.name.c_str(),
+ soid.snap.val);
+ if (op_finisher == nullptr) {
+ result = do_copy_get(ctx, bp, osd_op, ctx->obc);
+ } else {
+ result = op_finisher->execute();
+ }
break;
case CEPH_OSD_OP_COPY_FROM:
src_oloc.hash,
src_snapid,
src_version);
- if (!ctx->copy_cb) {
+ if (op_finisher == nullptr) {
// start
pg_t raw_pg;
get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg);
result = -EINVAL;
break;
}
- CopyFromCallback *cb = new CopyFromCallback(ctx);
- ctx->copy_cb = cb;
+ CopyFromCallback *cb = new CopyFromCallback(ctx, osd_op);
+ ctx->op_finishers[ctx->current_osd_subop_num].reset(
+ new CopyFromFinisher(cb));
start_copy(cb, ctx->obc, src, src_oloc, src_version,
op.copy_from.flags,
false,
result = -EINPROGRESS;
} else {
// finish
- assert(ctx->copy_cb->get_result() >= 0);
- finish_copyfrom(ctx);
- result = 0;
+ result = op_finisher->execute();
+ assert(result == 0);
+
+ // COPY_FROM cannot be executed multiple times -- it must restart
+ ctx->op_finishers.erase(ctx->current_osd_subop_num);
}
}
break;
&rollback_to, false, false, &missing_oid);
if (ret == -EAGAIN) {
/* clone must be missing */
- assert(is_missing_object(missing_oid));
- dout(20) << "_rollback_to attempted to roll back to a missing object "
+ assert(is_degraded_or_backfilling_object(missing_oid));
+ dout(20) << "_rollback_to attempted to roll back to a missing or backfilling clone "
<< missing_oid << " (requested snapid: ) " << snapid << dendl;
block_write_on_degraded_snap(missing_oid, ctx->op);
return ret;
int PrimaryLogPG::prepare_transaction(OpContext *ctx)
{
- assert(!ctx->ops.empty());
-
+ assert(!ctx->ops->empty());
+
const hobject_t& soid = ctx->obs->oi.soid;
// valid snap context?
}
// prepare the actual mutation
- int result = do_osd_ops(ctx, ctx->ops);
+ int result = do_osd_ops(ctx, *ctx->ops);
if (result < 0) {
if (ctx->op->may_write() &&
get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
const MOSDOp *m = static_cast<const MOSDOp*>(ctx->op->get_req());
assert(ctx->async_reads_complete());
- for (vector<OSDOp>::iterator p = ctx->ops.begin();
- p != ctx->ops.end() && result >= 0; ++p) {
+ for (vector<OSDOp>::iterator p = ctx->ops->begin();
+ p != ctx->ops->end() && result >= 0; ++p) {
if (p->rval < 0 && !(p->op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
result = p->rval;
break;
}
ctx->bytes_read += p->outdata.length();
}
- ctx->reply->claim_op_out_data(ctx->ops);
- ctx->reply->get_header().data_off = ctx->data_off;
+ ctx->reply->claim_op_out_data(*ctx->ops);
+ ctx->reply->get_header().data_off = (ctx->data_off ? *ctx->data_off : 0);
MOSDOpReply *reply = ctx->reply;
ctx->reply = nullptr;
C_CopyFrom_AsyncReadCb(OSDOp *osd_op, uint64_t features) :
osd_op(osd_op), features(features), len(0) {}
void finish(int r) override {
+ osd_op->rval = r;
+ if (r < 0) {
+ return;
+ }
+
assert(len > 0);
assert(len <= reply_obj.data.length());
bufferlist bl;
}
};
-int PrimaryLogPG::fill_in_copy_get(
- OpContext *ctx,
- bufferlist::iterator& bp,
- OSDOp& osd_op,
- ObjectContextRef &obc)
+int PrimaryLogPG::do_copy_get(OpContext *ctx, bufferlist::iterator& bp,
+ OSDOp& osd_op, ObjectContextRef &obc)
{
object_info_t& oi = obc->obs.oi;
hobject_t& soid = oi.soid;
make_pair(
boost::make_tuple(cursor.data_offset, max_read, osd_op.op.flags),
make_pair(&bl, cb)));
- result = max_read;
- cb->len = result;
+ cb->len = max_read;
+
+ ctx->op_finishers[ctx->current_osd_subop_num].reset(
+ new ReadFinisher(osd_op));
+ result = -EINPROGRESS;
+
+ dout(10) << __func__ << ": async_read noted for " << soid << dendl;
} else {
result = pgbackend->objects_read_sync(
- oi.soid, cursor.data_offset, left, osd_op.op.flags, &bl);
+ oi.soid, cursor.data_offset, max_read, osd_op.op.flags, &bl);
if (result < 0)
return result;
}
- assert(result <= left);
- left -= result;
- cursor.data_offset += result;
+ left -= max_read;
+ cursor.data_offset += max_read;
}
if (cursor.data_offset == oi.size) {
cursor.data_complete = true;
if (cb && !async_read_started) {
delete cb;
}
- result = 0;
+
+ if (result > 0) {
+ result = 0;
+ }
return result;
}
cop->temp_cursor = cop->cursor;
}
-void PrimaryLogPG::finish_copyfrom(OpContext *ctx)
+void PrimaryLogPG::finish_copyfrom(CopyFromCallback *cb)
{
+ OpContext *ctx = cb->ctx;
dout(20) << "finish_copyfrom on " << ctx->obs->oi.soid << dendl;
- ObjectState& obs = ctx->new_obs;
- CopyFromCallback *cb = static_cast<CopyFromCallback*>(ctx->copy_cb);
+ ObjectState& obs = ctx->new_obs;
if (obs.exists) {
dout(20) << __func__ << ": exists, removing" << dendl;
ctx->op_t->remove(obs.oi.soid);
last_update_applied = applied_version;
if (is_primary()) {
if (scrubber.active) {
- if (last_update_applied == scrubber.subset_last_update) {
+ if (last_update_applied >= scrubber.subset_last_update) {
if (ops_blocked_by_scrub()) {
requeue_scrub(true);
} else {
}
} else {
if (scrubber.active_rep_scrub) {
- if (last_update_applied == static_cast<const MOSDRepScrub*>(
+ if (last_update_applied >= static_cast<const MOSDRepScrub*>(
scrubber.active_rep_scrub->get_req())->scrub_to) {
osd->enqueue_back(
info.pgid,
PrimaryLogPG::OpContextUPtr PrimaryLogPG::simple_opc_create(ObjectContextRef obc)
{
dout(20) << __func__ << " " << obc->obs.oi.soid << dendl;
- vector<OSDOp> ops;
ceph_tid_t rep_tid = osd->get_tid();
osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
- OpContextUPtr ctx(new OpContext(OpRequestRef(), reqid, ops, obc, this));
+ OpContextUPtr ctx(new OpContext(OpRequestRef(), reqid, nullptr, obc, this));
ctx->op_t.reset(new PGTransaction());
ctx->mtime = ceph_clock_now();
return ctx;
ObjectContextRef obc = get_object_context(soid, false);
if (!obc || !obc->obs.exists) {
- dout(20) << __func__ << " missing clone " << soid << dendl;
if (pmissing)
*pmissing = soid;
put_snapset_context(ssc);
- return -ENOENT;
+ if (is_degraded_or_backfilling_object(soid)) {
+ dout(20) << __func__ << " clone is degraded or backfilling " << soid << dendl;
+ return -EAGAIN;
+ } else {
+ dout(20) << __func__ << " missing clone " << soid << dendl;
+ return -ENOENT;
+ }
}
if (!obc->ssc) {
return PULL_NONE;
}
+ if (missing_loc.is_deleted(soid)) {
+ start_recovery_op(soid);
+ assert(!recovering.count(soid));
+ recovering.insert(make_pair(soid, ObjectContextRef()));
+ epoch_t cur_epoch = get_osdmap()->get_epoch();
+ remove_missing_object(soid, v, new FunctionContext(
+ [=](int) {
+ lock();
+ if (!pg_has_reset_since(cur_epoch)) {
+ bool object_missing = false;
+ for (const auto& shard : actingbackfill) {
+ if (shard == pg_whoami)
+ continue;
+ if (peer_missing[shard].is_missing(soid)) {
+ dout(20) << __func__ << ": soid " << soid << " needs to be deleted from replica " << shard << dendl;
+ object_missing = true;
+ break;
+ }
+ }
+ if (!object_missing) {
+ object_stat_sum_t stat_diff;
+ stat_diff.num_objects_recovered = 1;
+ on_global_recover(soid, stat_diff, true);
+ } else {
+ auto recovery_handle = pgbackend->open_recovery_op();
+ pgbackend->recover_delete_object(soid, v, recovery_handle);
+ pgbackend->run_recovery_op(recovery_handle, priority);
+ }
+ }
+ unlock();
+ }));
+ return PULL_YES;
+ }
+
// is this a snapped object? if so, consult the snapset.. we may not need the entire object!
ObjectContextRef obc;
ObjectContextRef head_obc;
osd->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch());
}
+void PrimaryLogPG::remove_missing_object(const hobject_t &soid,
+ eversion_t v, Context *on_complete)
+{
+ dout(20) << __func__ << " " << soid << " " << v << dendl;
+ assert(on_complete != nullptr);
+ // delete locally
+ ObjectStore::Transaction t;
+ remove_snap_mapped_object(t, soid);
+
+ ObjectRecoveryInfo recovery_info;
+ recovery_info.soid = soid;
+ recovery_info.version = v;
+
+ epoch_t cur_epoch = get_osdmap()->get_epoch();
+ t.register_on_complete(new FunctionContext(
+ [=](int) {
+ lock();
+ if (!pg_has_reset_since(cur_epoch)) {
+ ObjectStore::Transaction t2;
+ on_local_recover(soid, recovery_info, ObjectContextRef(), true, &t2);
+ t2.register_on_complete(on_complete);
+ int r = osd->store->queue_transaction(osr.get(), std::move(t2), nullptr);
+ assert(r == 0);
+ unlock();
+ } else {
+ unlock();
+ on_complete->complete(-EAGAIN);
+ }
+ }));
+ int r = osd->store->queue_transaction(osr.get(), std::move(t), nullptr);
+ assert(r == 0);
+}
void PrimaryLogPG::finish_degraded_object(const hobject_t& oid)
{
dout(10) << "finish_degraded_object " << oid << dendl;
- ObjectContextRef obc(object_contexts.lookup(oid));
if (callbacks_for_degraded_object.count(oid)) {
list<Context*> contexts;
contexts.swap(callbacks_for_degraded_object[oid]);
void PrimaryLogPG::_applied_recovered_object(ObjectContextRef obc)
{
lock();
- dout(10) << "_applied_recovered_object " << *obc << dendl;
-
+ dout(20) << __func__ << dendl;
+ if (obc) {
+ dout(20) << "obc = " << *obc << dendl;
+ }
assert(active_pushes >= 1);
--active_pushes;
requeue_scrub(false);
}
}
-
unlock();
}
void PrimaryLogPG::_applied_recovered_object_replica()
{
lock();
- dout(10) << "_applied_recovered_object_replica" << dendl;
-
+ dout(20) << __func__ << dendl;
assert(active_pushes >= 1);
--active_pushes;
PGQueueable(scrubber.active_rep_scrub, get_osdmap()->get_epoch()));
scrubber.active_rep_scrub = OpRequestRef();
}
-
unlock();
}
std::move(manager),
boost::optional<std::function<void(void)> >(
[this, oids, con, num_unfound, tid]() {
- for (auto oid: oids)
+ if (perform_deletes_during_peering()) {
+ for (auto oid : oids) {
+ // clear old locations - merge_new_log_entries will have
+ // handled rebuilding missing_loc for each of these
+ // objects if we have the RECOVERY_DELETES flag
missing_loc.recovered(oid);
+ }
+ }
+
for (auto& p : waiting_for_unreadable_object) {
release_backoffs(p.first);
}
on_shutdown();
}
+void PrimaryLogPG::clear_async_reads()
+{
+ dout(10) << __func__ << dendl;
+ for(auto& i : in_progress_async_reads) {
+ dout(10) << "clear ctx: "
+ << "OpRequestRef " << i.first
+ << " OpContext " << i.second
+ << dendl;
+ close_op_ctx(i.second);
+ }
+}
+
void PrimaryLogPG::on_shutdown()
{
dout(10) << "on_shutdown" << dendl;
context_registry_on_change();
object_contexts.clear();
+ clear_async_reads();
+
osd->remote_reserver.cancel_reservation(info.pgid);
osd->local_reserver.cancel_reservation(info.pgid);
void PrimaryLogPG::_on_new_interval()
{
+ dout(20) << __func__ << "checking missing set deletes flag. missing = " << pg_log.get_missing() << dendl;
+ if (!pg_log.get_missing().may_include_deletes &&
+ get_osdmap()->test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
+ pg_log.rebuild_missing_set_with_deletes(osd->store, coll, info);
+ }
+ assert(pg_log.get_missing().may_include_deletes == get_osdmap()->test_flag(CEPH_OSDMAP_RECOVERY_DELETES));
}
void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
return false;
}
- const pg_missing_t &missing = pg_log.get_missing();
+ const auto &missing = pg_log.get_missing();
unsigned int num_missing = missing.num_missing();
uint64_t num_unfound = get_num_unfound();
if (missing.num_missing() > 0) {
// this shouldn't happen!
- osd->clog->error() << info.pgid << " recovery ending with " << missing.num_missing()
- << ": " << missing.get_items();
+ osd->clog->error() << info.pgid << " Unexpected Error: recovery ending with "
+ << missing.num_missing() << ": " << missing.get_items();
return work_in_progress;
}
if (needs_recovery()) {
// this shouldn't happen!
// We already checked num_missing() so we must have missing replicas
- osd->clog->error() << info.pgid << " recovery ending with missing replicas";
+ osd->clog->error() << info.pgid
+ << " Unexpected Error: recovery ending with missing replicas";
return work_in_progress;
}
if (state_test(PG_STATE_RECOVERING)) {
state_clear(PG_STATE_RECOVERING);
+ state_clear(PG_STATE_FORCED_RECOVERY);
if (needs_backfill()) {
dout(10) << "recovery done, queuing backfill" << dendl;
queue_peering_event(
} else {
dout(10) << "recovery done, no backfill" << dendl;
eio_errors_to_process = false;
+ state_clear(PG_STATE_FORCED_BACKFILL);
queue_peering_event(
CephPeeringEvtRef(
std::make_shared<CephPeeringEvt>(
}
} else { // backfilling
state_clear(PG_STATE_BACKFILL);
+ state_clear(PG_STATE_FORCED_BACKFILL);
+ state_clear(PG_STATE_FORCED_RECOVERY);
dout(10) << "recovery done, backfill done" << dendl;
eio_errors_to_process = false;
queue_peering_event(
{
assert(is_primary());
- const pg_missing_t &missing = pg_log.get_missing();
+ const auto &missing = pg_log.get_missing();
dout(10) << "recover_primary recovering " << recovering.size()
<< " in pg" << dendl;
if (pg_log.get_log().objects.count(p->second)) {
latest = pg_log.get_log().objects.find(p->second)->second;
- assert(latest->is_update());
+ assert(latest->is_update() || latest->is_delete());
soid = latest->soid;
} else {
latest = 0;
return uhoh;
}
+int PrimaryLogPG::prep_object_replica_deletes(
+ const hobject_t& soid, eversion_t v,
+ PGBackend::RecoveryHandle *h)
+{
+ assert(is_primary());
+ dout(10) << __func__ << ": on " << soid << dendl;
+
+ start_recovery_op(soid);
+ assert(!recovering.count(soid));
+ recovering.insert(make_pair(soid, ObjectContextRef()));
+
+ pgbackend->recover_delete_object(soid, v, h);
+ return 1;
+}
+
int PrimaryLogPG::prep_object_replica_pushes(
const hobject_t& soid, eversion_t v,
PGBackend::RecoveryHandle *h)
continue;
}
+ if (missing_loc.is_deleted(soid)) {
+ dout(10) << __func__ << ": " << soid << " is a delete, removing" << dendl;
+ map<hobject_t,pg_missing_item>::const_iterator r = m.get_items().find(soid);
+ started += prep_object_replica_deletes(soid, r->second.need, h);
+ continue;
+ }
+
if (soid.is_snap() && pg_log.get_missing().is_missing(soid.get_head())) {
dout(10) << __func__ << ": " << soid.get_head()
<< " still missing on primary" << dendl;
for (unsigned int i = 0 ; i < peers.size(); ++i) {
map<pg_shard_t, pg_missing_t>::iterator bpm = peer_missing.find(peers[i]);
assert(bpm != peer_missing.end());
- bpm->second.add(oid, eversion_t(), eversion_t());
+ bpm->second.add(oid, eversion_t(), eversion_t(), false);
}
assert(!recovering.count(oid));
continue;
did.insert(p->soid);
- if (p->is_delete()) {
+ if (p->is_delete() && !is_missing_object(p->soid)) {
dout(10) << " checking " << p->soid
<< " at " << p->version << dendl;
struct stat st;
if (!allow_incomplete_clones) {
next_clone.snap = **curclone;
clog->error() << mode << " " << pgid << " " << head.get()
- << " expected clone " << next_clone;
+ << " expected clone " << next_clone << " " << missing
+ << " missing";
++scrubber.shallow_errors;
e.set_clone_missing(next_clone.snap);
}
ObjectContextRef obc = get_object_context(p->first, false);
if (!obc) {
osd->clog->error() << info.pgid << " " << mode
- << " cannot get object context for "
+ << " cannot get object context for object "
<< p->first;
continue;
} else if (obc->obs.oi.soid != p->first) {
ObjectContextRef obc = get_object_context(p.first, true);
if (!obc) {
osd->clog->error() << info.pgid << " " << mode
- << " cannot get object context for "
+ << " cannot get object context for object "
<< p.first;
continue;
} else if (obc->obs.oi.soid != p.first) {