uint32_t flags; // object_copy_data_t::FLAG_*
uint32_t source_data_digest, source_omap_digest;
uint32_t data_digest, omap_digest;
- vector<pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
+ mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
map<string, bufferlist> attrs; // xattrs
uint64_t truncate_seq;
uint64_t truncate_size;
typedef boost::tuple<int, CopyResults*> CopyCallbackResults;
friend class CopyFromCallback;
+ friend class CopyFromFinisher;
friend class PromoteCallback;
struct ProxyReadOp {
const hobject_t &oid,
const ObjectRecoveryInfo &recovery_info,
ObjectContextRef obc,
+ bool is_delete,
ObjectStore::Transaction *t
) override;
void on_peer_recover(
const hobject_t oid) override;
void on_global_recover(
const hobject_t &oid,
- const object_stat_sum_t &stat_diff) override;
+ const object_stat_sum_t &stat_diff,
+ bool is_delete) override;
void failed_push(const list<pg_shard_t> &from, const hobject_t &soid) override;
+ void primary_failed(const hobject_t &soid) override;
+ bool primary_error(const hobject_t& soid, eversion_t v) override;
void cancel_pull(const hobject_t &soid) override;
void apply_stats(
const hobject_t &soid,
const object_stat_sum_t &delta_stats) override;
+ void on_primary_error(const hobject_t &oid, eversion_t v) override;
+ void remove_missing_object(const hobject_t &oid,
+ eversion_t v,
+ Context *on_complete) override;
template<class T> class BlessedGenContext;
class BlessedContext;
ceph_tid_t get_tid() override { return osd->get_tid(); }
LogClientTemp clog_error() override { return osd->clog->error(); }
+ LogClientTemp clog_warn() override { return osd->clog->warn(); }
struct watch_disconnect_t {
uint64_t cookie;
ObjectContextRef obc,
const list<watch_disconnect_t> &to_disconnect);
+ struct OpFinisher {
+ virtual ~OpFinisher() {
+ }
+
+ virtual int execute() = 0;
+ };
+
/*
* Capture all object state associated with an in-progress read or write.
*/
struct OpContext {
OpRequestRef op;
osd_reqid_t reqid;
- vector<OSDOp> &ops;
+ vector<OSDOp> *ops;
const ObjectState *obs; // Old objectstate
const SnapSet *snapset; // Old snapset
ObjectContextRef clone_obc; // if we created a clone
ObjectContextRef snapset_obc; // if we created/deleted a snapdir
- int data_off; // FIXME: we may want to kill this msgr hint off at some point!
+ // FIXME: we may want to kill this msgr hint off at some point!
+ boost::optional<int> data_off = boost::none;
MOSDOpReply *reply;
int num_read; ///< count read ops
int num_write; ///< count update ops
- vector<pair<osd_reqid_t, version_t> > extra_reqids;
-
- CopyFromCallback *copy_cb;
+ mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > extra_reqids;
hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking
// pending async reads <off, len, op_flags> -> <outbl, outr>
list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
pair<bufferlist*, Context*> > > pending_async_reads;
- int async_read_result;
int inflightreads;
friend struct OnReadComplete;
void start_async_reads(PrimaryLogPG *pg);
ObjectContext::RWState::State lock_type;
ObcLockManager lock_manager;
+ std::map<int, std::unique_ptr<OpFinisher>> op_finishers;
+
OpContext(const OpContext& other);
const OpContext& operator=(const OpContext& other);
- OpContext(OpRequestRef _op, osd_reqid_t _reqid, vector<OSDOp>& _ops,
+ OpContext(OpRequestRef _op, osd_reqid_t _reqid, vector<OSDOp>* _ops,
ObjectContextRef& obc,
PrimaryLogPG *_pg) :
op(_op), reqid(_reqid), ops(_ops),
bytes_written(0), bytes_read(0), user_at_version(0),
current_osd_subop_num(0),
obc(obc),
- data_off(0), reply(NULL), pg(_pg),
+ reply(NULL), pg(_pg),
num_read(0),
num_write(0),
- copy_cb(NULL),
sent_reply(false),
- async_read_result(0),
inflightreads(0),
lock_type(ObjectContext::RWState::RWNONE) {
if (obc->ssc) {
}
}
OpContext(OpRequestRef _op, osd_reqid_t _reqid,
- vector<OSDOp>& _ops, PrimaryLogPG *_pg) :
+ vector<OSDOp>* _ops, PrimaryLogPG *_pg) :
op(_op), reqid(_reqid), ops(_ops), obs(NULL), snapset(0),
modify(false), user_modify(false), undirty(false), cache_evict(false),
ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
bytes_written(0), bytes_read(0), user_at_version(0),
current_osd_subop_num(0),
- data_off(0), reply(NULL), pg(_pg),
+ reply(NULL), pg(_pg),
num_read(0),
num_write(0),
- copy_cb(NULL),
- async_read_result(0),
inflightreads(0),
lock_type(ObjectContext::RWState::RWNONE) {}
void reset_obs(ObjectContextRef obc) {
*
* @param ctx [in] ctx to clean up
*/
- void close_op_ctx(OpContext *ctx) {
- release_object_locks(ctx->lock_manager);
- ctx->op_t.reset();
- for (auto p = ctx->on_finish.begin();
- p != ctx->on_finish.end();
- ctx->on_finish.erase(p++)) {
- (*p)();
- }
- delete ctx;
- }
+ void close_op_ctx(OpContext *ctx);
/**
* Releases locks
* Also used to store error log entries for dup detection.
*/
void submit_log_entries(
- const mempool::osd::list<pg_log_entry_t> &entries,
+ const mempool::osd_pglog::list<pg_log_entry_t> &entries,
ObcLockManager &&manager,
boost::optional<std::function<void(void)> > &&on_complete,
OpRequestRef op = OpRequestRef(),
int prep_object_replica_pushes(const hobject_t& soid, eversion_t v,
PGBackend::RecoveryHandle *h);
+ int prep_object_replica_deletes(const hobject_t& soid, eversion_t v,
+ PGBackend::RecoveryHandle *h);
void finish_degraded_object(const hobject_t& oid);
bool must_promote,
bool in_hit_set,
ObjectContextRef *promote_obc);
+ cache_result_t maybe_handle_manifest_detail(OpRequestRef op,
+ bool write_ordered,
+ ObjectContextRef obc);
+ bool maybe_handle_manifest(OpRequestRef op,
+ bool write_ordered,
+ ObjectContextRef obc) {
+ return cache_result_t::NOOP != maybe_handle_manifest_detail(
+ op,
+ write_ordered,
+ obc);
+ }
+
/**
* This helper function is called from do_op if the ObjectContext lookup fails.
* @returns true if the caching code is handling the Op, false otherwise.
ThreadPool::TPHandle &handle ///< [in] tp handle
);
- void prep_backfill_object_push(
+ int prep_backfill_object_push(
hobject_t oid, eversion_t v, ObjectContextRef obc,
vector<pg_shard_t> peers,
PGBackend::RecoveryHandle *h);
// -- copyfrom --
map<hobject_t, CopyOpRef> copy_ops;
- int fill_in_copy_get(
- OpContext *ctx,
- bufferlist::iterator& bp,
- OSDOp& op,
- ObjectContextRef& obc);
+ int do_copy_get(OpContext *ctx, bufferlist::iterator& bp, OSDOp& op,
+ ObjectContextRef& obc);
+ int finish_copy_get();
+
void fill_in_copy_get_noent(OpRequestRef& op, hobject_t oid,
OSDOp& osd_op);
return size;
}
void _copy_some(ObjectContextRef obc, CopyOpRef cop);
- void finish_copyfrom(OpContext *ctx);
+ void finish_copyfrom(CopyFromCallback *cb);
void finish_promote(int r, CopyResults *results, ObjectContextRef obc);
void cancel_copy(CopyOpRef cop, bool requeue);
void cancel_copy_ops(bool requeue);
int do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr);
// -- checksum --
- int do_checksum(OpContext *ctx, OSDOp& osd_op, bufferlist::iterator *bl_it,
- bool *async_read);
+ int do_checksum(OpContext *ctx, OSDOp& osd_op, bufferlist::iterator *bl_it);
int finish_checksum(OSDOp& osd_op, Checksummer::CSumType csum_type,
bufferlist::iterator *init_value_bl_it,
const bufferlist &read_bl);
friend class C_ChecksumRead;
int do_extent_cmp(OpContext *ctx, OSDOp& osd_op);
+ int finish_extent_cmp(OSDOp& osd_op, const bufferlist &read_bl);
+
+ friend class C_ExtentCmpRead;
+
+ int do_read(OpContext *ctx, OSDOp& osd_op);
+ int do_sparse_read(OpContext *ctx, OSDOp& osd_op);
int do_writesame(OpContext *ctx, OSDOp& osd_op);
bool pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata);
// -- proxyread --
map<ceph_tid_t, ProxyReadOpRef> proxyread_ops;
- void do_proxy_read(OpRequestRef op);
+ void do_proxy_read(OpRequestRef op, ObjectContextRef obc = NULL);
void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r);
void cancel_proxy_read(ProxyReadOpRef prdop);
// -- proxywrite --
map<ceph_tid_t, ProxyWriteOpRef> proxywrite_ops;
- void do_proxy_write(OpRequestRef op, const hobject_t& missing_oid);
+ void do_proxy_write(OpRequestRef op, const hobject_t& missing_oid, ObjectContextRef obc = NULL);
void finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r);
void cancel_proxy_write(ProxyWriteOpRef pwop);
void handle_backoff(OpRequestRef& op);
- OpContextUPtr trim_object(bool first, const hobject_t &coid);
+ int trim_object(bool first, const hobject_t &coid, OpContextUPtr *ctxp);
void snap_trimmer(epoch_t e) override;
void kick_snap_trim() override;
void snap_trimmer_scrub_complete() override;
};
auto *pg = context< SnapTrimmer >().pg;
if (pg->cct->_conf->osd_snap_trim_sleep > 0) {
- wakeup = new OnTimer{pg, pg->get_osdmap()->get_epoch()};
Mutex::Locker l(pg->osd->snap_sleep_lock);
- pg->osd->snap_sleep_timer.add_event_after(
- pg->cct->_conf->osd_snap_trim_sleep, wakeup);
+ wakeup = pg->osd->snap_sleep_timer.add_event_after(
+ pg->cct->_conf->osd_snap_trim_sleep,
+ new OnTimer{pg, pg->get_osdmap()->get_epoch()});
} else {
post_event(SnapTrimTimerReady());
}
pending = nullptr;
auto *pg = context< SnapTrimmer >().pg;
pg->state_clear(PG_STATE_SNAPTRIM_WAIT);
+ pg->state_clear(PG_STATE_SNAPTRIM_ERROR);
pg->publish_stats_to_osd();
}
};
void block_write_on_full_cache(
const hobject_t& oid, OpRequestRef op);
+ void block_for_clean(
+ const hobject_t& oid, OpRequestRef op);
void block_write_on_snap_rollback(
const hobject_t& oid, ObjectContextRef obc, OpRequestRef op);
void block_write_on_degraded_snap(const hobject_t& oid, OpRequestRef op);
void on_role_change() override;
void on_pool_change() override;
void _on_new_interval() override;
+ void clear_async_reads();
void on_change(ObjectStore::Transaction *t) override;
void on_activate() override;
void on_flushed() override;
void on_shutdown() override;
bool check_failsafe_full(ostream &ss) override;
bool check_osdmap_full(const set<pg_shard_t> &missing_on) override;
+ int rep_repair_primary_object(const hobject_t& soid, OpRequestRef op);
// attr cache handling
void setattr_maybe_cache(