#define CEPH_REPLICATEDPG_H
#include <boost/tuple/tuple.hpp>
-#include "include/assert.h"
+#include "include/ceph_assert.h"
+#include "DynamicPerfStats.h"
+#include "OSD.h"
#include "PG.h"
#include "Watch.h"
#include "TierAgentState.h"
#include "messages/MOSDOpReply.h"
#include "common/Checksummer.h"
#include "common/sharedptr_registry.hpp"
+#include "common/shared_cache.hpp"
#include "ReplicatedBackend.h"
#include "PGTransaction.h"
+#include "cls/cas/cls_cas_ops.h"
class CopyFromCallback;
class PromoteCallback;
uint32_t source_data_digest, source_omap_digest;
uint32_t data_digest, omap_digest;
mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
+ mempool::osd_pglog::map<uint32_t, int> reqid_return_codes; // map reqids by index to error code
map<string, bufferlist> attrs; // xattrs
uint64_t truncate_seq;
uint64_t truncate_size;
{}
};
+ struct CopyOp;
+ typedef std::shared_ptr<CopyOp> CopyOpRef;
+
struct CopyOp {
CopyCallback *cb;
ObjectContextRef obc;
unsigned src_obj_fadvise_flags;
unsigned dest_obj_fadvise_flags;
+ map<uint64_t, CopyOpRef> chunk_cops;
+ int num_chunk;
+ bool failed;
+ uint64_t start_offset = 0;
+ uint64_t last_offset = 0;
+ vector<OSDOp> chunk_ops;
+
CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s,
object_locator_t l,
version_t v,
objecter_tid2(0),
rval(-1),
src_obj_fadvise_flags(src_obj_fadvise_flags),
- dest_obj_fadvise_flags(dest_obj_fadvise_flags)
+ dest_obj_fadvise_flags(dest_obj_fadvise_flags),
+ num_chunk(0),
+ failed(false)
{
results.user_version = v;
results.mirror_snapset = mirror_snapset;
}
};
- typedef ceph::shared_ptr<CopyOp> CopyOpRef;
/**
* The CopyCallback class defines an interface for completions to the
friend class CopyFromCallback;
friend class CopyFromFinisher;
friend class PromoteCallback;
+ friend class PromoteFinisher;
struct ProxyReadOp {
OpRequestRef op;
user_version(0), data_offset(0),
canceled(false) { }
};
- typedef ceph::shared_ptr<ProxyReadOp> ProxyReadOpRef;
+ typedef std::shared_ptr<ProxyReadOp> ProxyReadOpRef;
struct ProxyWriteOp {
OpContext *ctx;
canceled(false),
reqid(_reqid) { }
};
- typedef ceph::shared_ptr<ProxyWriteOp> ProxyWriteOpRef;
+ typedef std::shared_ptr<ProxyWriteOp> ProxyWriteOpRef;
struct FlushOp {
ObjectContextRef obc; ///< obc we are flushing
bool removal; ///< we are removing the backend object
boost::optional<std::function<void()>> on_flush; ///< callback, may be null
// for chunked object
- map<uint64_t, ceph_tid_t> io_tids;
+ map<uint64_t, int> io_results;
+ map<uint64_t, ceph_tid_t> io_tids;
+ uint64_t chunks;
FlushOp()
: flushed_version(0), objecter_tid(0), rval(0),
- blocking(false), removal(false) {}
- ~FlushOp() { assert(!on_flush); }
+ blocking(false), removal(false), chunks(0) {}
+ ~FlushOp() { ceph_assert(!on_flush); }
};
- typedef ceph::shared_ptr<FlushOp> FlushOpRef;
+ typedef std::shared_ptr<FlushOp> FlushOpRef;
boost::scoped_ptr<PGBackend> pgbackend;
PGBackend *get_pgbackend() override {
return pgbackend.get();
}
+ const PGBackend *get_pgbackend() const override {
+ return pgbackend.get();
+ }
+
/// Listener methods
DoutPrefixProvider *get_dpp() override {
return this;
Context *on_complete) override;
template<class T> class BlessedGenContext;
+ template<class T> class UnlockedBlessedGenContext;
class BlessedContext;
Context *bless_context(Context *c) override;
GenContext<ThreadPool::TPHandle&> *bless_gencontext(
GenContext<ThreadPool::TPHandle&> *c) override;
+ GenContext<ThreadPool::TPHandle&> *bless_unlocked_gencontext(
+ GenContext<ThreadPool::TPHandle&> *c) override;
void send_message(int to_osd, Message *m) override {
- osd->send_message_osd_cluster(to_osd, m, get_osdmap()->get_epoch());
+ osd->send_message_osd_cluster(to_osd, m, get_osdmap_epoch());
}
void queue_transaction(ObjectStore::Transaction&& t,
OpRequestRef op) override {
- osd->store->queue_transaction(osr.get(), std::move(t), 0, 0, 0, op);
+ osd->store->queue_transaction(ch, std::move(t), op);
}
void queue_transactions(vector<ObjectStore::Transaction>& tls,
OpRequestRef op) override {
- osd->store->queue_transactions(osr.get(), tls, 0, 0, 0, op, NULL);
- }
- epoch_t get_epoch() const override {
- return get_osdmap()->get_epoch();
+ osd->store->queue_transactions(ch, tls, op, NULL);
}
epoch_t get_interval_start_epoch() const override {
return info.history.same_interval_since;
epoch_t get_last_peering_reset_epoch() const override {
return get_last_peering_reset();
}
- const set<pg_shard_t> &get_actingbackfill_shards() const override {
- return actingbackfill;
+ const set<pg_shard_t> &get_acting_recovery_backfill_shards() const override {
+ return acting_recovery_backfill;
}
const set<pg_shard_t> &get_acting_shards() const override {
return actingset;
return backfill_targets;
}
- std::string gen_dbg_prefix() const override { return gen_prefix(); }
-
+ std::ostream& gen_dbg_prefix(std::ostream& out) const override {
+ return gen_prefix(out);
+ }
+
const map<hobject_t, set<pg_shard_t>>
&get_missing_loc_shards() const override {
return missing_loc.get_missing_locs();
const PGLog &get_log() const override {
return pg_log;
}
+ void add_local_next_event(const pg_log_entry_t& e) override {
+ pg_log.missing_add_next_entry(e);
+ }
bool pgb_is_primary() const override {
return is_primary();
}
- OSDMapRef pgb_get_osdmap() const override {
+ const OSDMapRef& pgb_get_osdmap() const override final {
return get_osdmap();
}
+ epoch_t pgb_get_osdmap_epoch() const override final {
+ return get_osdmap_epoch();
+ }
const pg_info_t &get_info() const override {
return info;
}
release_object_locks(manager);
}
+ bool pg_is_repair() override {
+ return is_repair();
+ }
+ void inc_osd_stat_repaired() override {
+ osd->inc_osd_stat_repaired();
+ }
+ bool pg_is_remote_backfilling() override {
+ return is_remote_backfilling();
+ }
+ void pg_add_local_num_bytes(int64_t num_bytes) override {
+ add_local_num_bytes(num_bytes);
+ }
+ void pg_sub_local_num_bytes(int64_t num_bytes) override {
+ sub_local_num_bytes(num_bytes);
+ }
+ void pg_add_num_bytes(int64_t num_bytes) override {
+ add_num_bytes(num_bytes);
+ }
+ void pg_sub_num_bytes(int64_t num_bytes) override {
+ sub_num_bytes(num_bytes);
+ }
+
void pgb_set_object_snap_mapping(
const hobject_t &soid,
const set<snapid_t> &snaps,
const eversion_t &trim_to,
const eversion_t &roll_forward_to,
bool transaction_applied,
- ObjectStore::Transaction &t) override {
+ ObjectStore::Transaction &t,
+ bool async = false) override {
if (hset_history) {
info.hit_set = *hset_history;
}
- append_log(logv, trim_to, roll_forward_to, t, transaction_applied);
+ append_log(logv, trim_to, roll_forward_to, t, transaction_applied, async);
}
- struct C_OSD_OnApplied;
- void op_applied(
- const eversion_t &applied_version) override;
+ void op_applied(const eversion_t &applied_version) override;
bool should_send_op(
pg_shard_t peer,
- const hobject_t &hoid) override {
- if (peer == get_primary())
- return true;
- assert(peer_info.count(peer));
- bool should_send =
- hoid.pool != (int64_t)info.pgid.pool() ||
- hoid <= last_backfill_started ||
- hoid <= peer_info[peer].last_backfill;
- if (!should_send)
- assert(is_backfill_targets(peer));
- return should_send;
+ const hobject_t &hoid) override;
+
+ bool pg_is_undersized() const override {
+ return is_undersized();
}
+ bool pg_is_repair() const override {
+ return is_repair();
+ }
+
void update_peer_last_complete_ondisk(
pg_shard_t fromosd,
eversion_t lcod) override {
pg_shard_t primary_shard() const override {
return primary;
}
- uint64_t min_peer_features() const override {
- return get_min_peer_features();
- }
void send_message_osd_cluster(
int peer, Message *m, epoch_t from_epoch) override;
interval_set<uint64_t> modified_ranges;
ObjectContextRef obc;
ObjectContextRef clone_obc; // if we created a clone
- ObjectContextRef snapset_obc; // if we created/deleted a snapdir
+ ObjectContextRef head_obc; // if we also update snapset (see trim_object)
// FIXME: we may want to kill this msgr hint off at some point!
boost::optional<int> data_off = boost::none;
int num_write; ///< count update ops
mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > extra_reqids;
+ mempool::osd_pglog::map<uint32_t, int> extra_reqid_return_codes;
hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking
on_committed.emplace_back(std::forward<F>(f));
}
- bool sent_reply;
+ bool sent_reply = false;
// pending async reads <off, len, op_flags> -> <outbl, outr>
list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
}
}
~OpContext() {
- assert(!op_t);
+ ceph_assert(!op_t);
if (reply)
reply->put();
for (list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
ceph_tid_t rep_tid;
- bool rep_aborted, rep_done;
-
- bool all_applied;
+ bool rep_aborted;
bool all_committed;
- const bool applies_with_commit;
utime_t start;
ObcLockManager lock_manager;
- list<std::function<void()>> on_applied;
list<std::function<void()>> on_committed;
list<std::function<void()>> on_success;
list<std::function<void()>> on_finish;
RepGather(
OpContext *c, ceph_tid_t rt,
- eversion_t lc,
- bool applies_with_commit) :
+ eversion_t lc) :
hoid(c->obc->obs.oi.soid),
op(c->op),
queue_item(this),
nref(1),
rep_tid(rt),
- rep_aborted(false), rep_done(false),
- all_applied(false), all_committed(false),
- applies_with_commit(applies_with_commit),
+ rep_aborted(false),
+ all_committed(false),
pg_local_last_complete(lc),
lock_manager(std::move(c->lock_manager)),
- on_applied(std::move(c->on_applied)),
on_committed(std::move(c->on_committed)),
on_success(std::move(c->on_success)),
on_finish(std::move(c->on_finish)) {}
boost::optional<std::function<void(void)> > &&on_complete,
ceph_tid_t rt,
eversion_t lc,
- bool applies_with_commit,
int r) :
op(o),
queue_item(this),
nref(1),
r(r),
rep_tid(rt),
- rep_aborted(false), rep_done(false),
- all_applied(false), all_committed(false),
- applies_with_commit(applies_with_commit),
+ rep_aborted(false),
+ all_committed(false),
pg_local_last_complete(lc),
lock_manager(std::move(manager)) {
if (on_complete) {
return this;
}
void put() {
- assert(nref > 0);
+ ceph_assert(nref > 0);
if (--nref == 0) {
- assert(on_applied.empty());
delete this;
//generic_dout(0) << "deleting " << this << dendl;
}
* @return true on success, false if we are queued
*/
bool get_rw_locks(bool write_ordered, OpContext *ctx) {
- /* If snapset_obc, !obc->obs->exists and we will always take the
+ /* If head_obc, !obc->obs->exists and we will always take the
* snapdir lock *before* the head lock. Since all callers will do
* this (read or write) if we get the first we will be guaranteed
* to get the second.
} else if (write_ordered) {
ctx->lock_type = ObjectContext::RWState::RWWRITE;
} else {
- assert(ctx->op->may_read());
+ ceph_assert(ctx->op->may_read());
ctx->lock_type = ObjectContext::RWState::RWREAD;
}
- if (ctx->snapset_obc) {
- assert(!ctx->obc->obs.exists);
+ if (ctx->head_obc) {
+ ceph_assert(!ctx->obc->obs.exists);
if (!ctx->lock_manager.get_lock_type(
ctx->lock_type,
- ctx->snapset_obc->obs.oi.soid,
- ctx->snapset_obc,
+ ctx->head_obc->obs.oi.soid,
+ ctx->head_obc,
ctx->op)) {
ctx->lock_type = ObjectContext::RWState::RWNONE;
return false;
ctx->op)) {
return true;
} else {
- assert(!ctx->snapset_obc);
+ ceph_assert(!ctx->head_obc);
ctx->lock_type = ObjectContext::RWState::RWNONE;
return false;
}
*/
void release_object_locks(
ObcLockManager &lock_manager) {
- list<pair<hobject_t, list<OpRequestRef> > > to_req;
+ list<pair<ObjectContextRef, list<OpRequestRef> > > to_req;
bool requeue_recovery = false;
bool requeue_snaptrim = false;
lock_manager.put_locks(
if (!to_req.empty()) {
// requeue at front of scrub blocking queue if we are blocked by scrub
for (auto &&p: to_req) {
- if (write_blocked_by_scrub(p.first.get_head())) {
+ if (write_blocked_by_scrub(p.first->obs.oi.soid.get_head())) {
for (auto& op : p.second) {
op->mark_delayed("waiting for scrub");
}
+
waiting_for_scrub.splice(
waiting_for_scrub.begin(),
p.second,
// [primary|tail]
xlist<RepGather*> repop_queue;
- friend class C_OSD_RepopApplied;
friend class C_OSD_RepopCommit;
- void repop_all_applied(RepGather *repop);
void repop_all_committed(RepGather *repop);
void eval_repop(RepGather*);
void issue_repop(RepGather *repop, OpContext *ctx);
void simple_opc_submit(OpContextUPtr ctx);
/**
- * Merge entries atomically into all actingbackfill osds
+ * Merge entries atomically into all acting_recovery_backfill osds
* adjusting missing and recovery state as necessary.
*
* Also used to store error log entries for dup detection.
void populate_obc_watchers(ObjectContextRef obc);
void check_blacklisted_obc_watchers(ObjectContextRef obc);
void check_blacklisted_watchers() override;
- void get_watchers(list<obj_watch_item_t> &pg_watchers) override;
+ void get_watchers(list<obj_watch_item_t> *ls) override;
void get_obc_watchers(ObjectContextRef obc, list<obj_watch_item_t> &pg_watchers);
public:
void handle_watch_timeout(WatchRef watch);
bool oid_existed = true //indicate this oid whether exsited in backend
);
void register_snapset_context(SnapSetContext *ssc) {
- Mutex::Locker l(snapset_contexts_lock);
+ std::lock_guard l(snapset_contexts_lock);
_register_snapset_context(ssc);
}
void _register_snapset_context(SnapSetContext *ssc) {
- assert(snapset_contexts_lock.is_locked());
+ ceph_assert(snapset_contexts_lock.is_locked());
if (!ssc->registered) {
- assert(snapset_contexts.count(ssc->oid) == 0);
+ ceph_assert(snapset_contexts.count(ssc->oid) == 0);
ssc->registered = true;
snapset_contexts[ssc->oid] = ssc;
}
bool new_backfill;
int prep_object_replica_pushes(const hobject_t& soid, eversion_t v,
- PGBackend::RecoveryHandle *h);
+ PGBackend::RecoveryHandle *h,
+ bool *work_started);
int prep_object_replica_deletes(const hobject_t& soid, eversion_t v,
- PGBackend::RecoveryHandle *h);
+ PGBackend::RecoveryHandle *h,
+ bool *work_started);
- void finish_degraded_object(const hobject_t& oid);
+ void finish_degraded_object(const hobject_t& oid) override;
// Cancels/resets pulls from peer
void check_recovery_sources(const OSDMapRef& map) override ;
const hobject_t& head, const hobject_t& coid,
object_info_t *poi);
void execute_ctx(OpContext *ctx);
- void finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc=true);
+ void finish_ctx(OpContext *ctx, int log_op_type);
void reply_ctx(OpContext *ctx, int err);
void reply_ctx(OpContext *ctx, int err, eversion_t v, version_t uv);
void make_writeable(OpContext *ctx);
- void log_op_stats(OpContext *ctx);
+ void log_op_stats(const OpRequest& op, uint64_t inb, uint64_t outb);
void write_update_size_and_usage(object_stat_sum_t& stats, object_info_t& oi,
interval_set<uint64_t>& modified, uint64_t offset,
uint64_t length, bool write_full=false);
- void add_interval_usage(interval_set<uint64_t>& s, object_stat_sum_t& st);
-
+ inline void truncate_update_size_and_usage(
+ object_stat_sum_t& delta_stats,
+ object_info_t& oi,
+ uint64_t truncate_size);
enum class cache_result_t {
NOOP,
HANDLED_PROXY,
HANDLED_REDIRECT,
REPLIED_WITH_EAGAIN,
+ BLOCKED_RECOVERY,
};
cache_result_t maybe_handle_cache_detail(OpRequestRef op,
bool write_ordered,
ThreadPool::TPHandle &handle, uint64_t *started) override;
uint64_t recover_primary(uint64_t max, ThreadPool::TPHandle &handle);
- uint64_t recover_replicas(uint64_t max, ThreadPool::TPHandle &handle);
+ uint64_t recover_replicas(uint64_t max, ThreadPool::TPHandle &handle,
+ bool *recovery_started);
hobject_t earliest_peer_backfill() const;
bool all_peer_done() const;
/**
void send_remove_op(const hobject_t& oid, eversion_t v, pg_shard_t peer);
- class C_OSD_OndiskWriteUnlock;
class C_OSD_AppliedRecoveredObject;
class C_OSD_CommittedPushedObject;
class C_OSD_AppliedRecoveredObjectReplica;
- void sub_op_remove(OpRequestRef op);
void _applied_recovered_object(ObjectContextRef obc);
void _applied_recovered_object_replica();
// -- copyfrom --
map<hobject_t, CopyOpRef> copy_ops;
- int do_copy_get(OpContext *ctx, bufferlist::iterator& bp, OSDOp& op,
+ int do_copy_get(OpContext *ctx, bufferlist::const_iterator& bp, OSDOp& op,
ObjectContextRef& obc);
int finish_copy_get();
void _write_copy_chunk(CopyOpRef cop, PGTransaction *t);
uint64_t get_copy_chunk_size() const {
uint64_t size = cct->_conf->osd_copyfrom_max_chunk;
- if (pool.info.requires_aligned_append()) {
+ if (pool.info.required_alignment()) {
uint64_t alignment = pool.info.required_alignment();
if (size % alignment) {
size += alignment - (size % alignment);
int do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr);
// -- checksum --
- int do_checksum(OpContext *ctx, OSDOp& osd_op, bufferlist::iterator *bl_it);
+ int do_checksum(OpContext *ctx, OSDOp& osd_op, bufferlist::const_iterator *bl_it);
int finish_checksum(OSDOp& osd_op, Checksummer::CSumType csum_type,
- bufferlist::iterator *init_value_bl_it,
+ bufferlist::const_iterator *init_value_bl_it,
const bufferlist &read_bl);
friend class C_ChecksumRead;
int do_writesame(OpContext *ctx, OSDOp& osd_op);
bool pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata);
- int get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilter);
+ int get_pgls_filter(bufferlist::const_iterator& iter, PGLSFilter **pfilter);
map<hobject_t, list<OpRequestRef>> in_progress_proxy_ops;
void kick_proxy_ops_blocked(hobject_t& soid);
// -- proxywrite --
map<ceph_tid_t, ProxyWriteOpRef> proxywrite_ops;
- void do_proxy_write(OpRequestRef op, const hobject_t& missing_oid, ObjectContextRef obc = NULL);
- void cancel_and_requeue_proxy_ops(hobject_t oid);
+ void do_proxy_write(OpRequestRef op, ObjectContextRef obc = NULL);
void finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r);
void cancel_proxy_write(ProxyWriteOpRef pwop, vector<ceph_tid_t> *tids);
friend struct C_ProxyWrite_Commit;
+ // -- chunkop --
+ void do_proxy_chunked_op(OpRequestRef op, const hobject_t& missing_oid,
+ ObjectContextRef obc, bool write_ordered);
+ void do_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc, int op_index,
+ uint64_t chunk_index, uint64_t req_offset, uint64_t req_length,
+ uint64_t req_total_len, bool write_ordered);
+ bool can_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc);
+ void _copy_some_manifest(ObjectContextRef obc, CopyOpRef cop, uint64_t start_offset);
+ void process_copy_chunk_manifest(hobject_t oid, ceph_tid_t tid, int r, uint64_t offset);
+ void finish_promote_manifest(int r, CopyResults *results, ObjectContextRef obc);
+ void cancel_and_requeue_proxy_ops(hobject_t oid);
+ int do_manifest_flush(OpRequestRef op, ObjectContextRef obc, FlushOpRef manifest_fop,
+ uint64_t start_offset, bool block);
+ int start_manifest_flush(OpRequestRef op, ObjectContextRef obc, bool blocking,
+ boost::optional<std::function<void()>> &&on_flush);
+ void finish_manifest_flush(hobject_t oid, ceph_tid_t tid, int r, ObjectContextRef obc,
+ uint64_t last_offset);
+ void handle_manifest_flush(hobject_t oid, ceph_tid_t tid, int r,
+ uint64_t offset, uint64_t last_offset, epoch_t lpr);
+ void refcount_manifest(ObjectContextRef obc, object_locator_t oloc, hobject_t soid,
+ SnapContext snapc, bool get, Context *cb, uint64_t offset);
+
+ friend struct C_ProxyChunkRead;
+ friend class PromoteManifestCallback;
+ friend class C_CopyChunk;
+ friend struct C_ManifestFlush;
+ friend struct RefCountCallback;
+
public:
PrimaryLogPG(OSDService *o, OSDMapRef curmap,
- const PGPool &_pool, spg_t p);
+ const PGPool &_pool,
+ const map<string,string>& ec_profile,
+ spg_t p);
~PrimaryLogPG() override {}
int do_command(
ConnectionRef conn,
ceph_tid_t tid) override;
+ void clear_cache();
+ int get_cache_obj_count() {
+ return object_contexts.get_count();
+ }
void do_request(
OpRequestRef& op,
ThreadPool::TPHandle &handle) override;
- void do_op(OpRequestRef& op) override;
+ void do_op(OpRequestRef& op);
void record_write_error(OpRequestRef op, const hobject_t &soid,
MOSDOpReply *orig_reply, int r);
void do_pg_op(OpRequestRef op);
- void do_sub_op(OpRequestRef op) override;
- void do_sub_op_reply(OpRequestRef op) override;
void do_scan(
OpRequestRef op,
- ThreadPool::TPHandle &handle) override;
- void do_backfill(OpRequestRef op) override;
+ ThreadPool::TPHandle &handle);
+ void do_backfill(OpRequestRef op);
void do_backfill_remove(OpRequestRef op);
void handle_backoff(OpRequestRef& op);
int _get_tmap(OpContext *ctx, bufferlist *header, bufferlist *vals);
int do_tmap2omap(OpContext *ctx, unsigned flags);
- int do_tmapup(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op);
- int do_tmapup_slow(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op, bufferlist& bl);
+ int do_tmapup(OpContext *ctx, bufferlist::const_iterator& bp, OSDOp& osd_op);
+ int do_tmapup_slow(OpContext *ctx, bufferlist::const_iterator& bp, OSDOp& osd_op, bufferlist& bl);
void do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn);
private:
hobject_t get_temp_recovery_object(const hobject_t& target,
eversion_t version) override;
int get_recovery_op_priority() const {
- int pri = 0;
- pool.info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
- return pri > 0 ? pri : cct->_conf->osd_recovery_op_priority;
+ int64_t pri = 0;
+ pool.info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
+ return pri > 0 ? pri : cct->_conf->osd_recovery_op_priority;
}
void log_missing(unsigned missing,
const boost::optional<hobject_t> &head,
void log_enter(const char *state_name);
void log_exit(const char *state_name, utime_t duration);
bool can_trim() {
- return pg->is_clean() && !pg->scrubber.active && !pg->snap_trimq.empty();
+ return
+ pg->is_clean() &&
+ !pg->scrubber.active &&
+ !pg->snap_trimq.empty() &&
+ !pg->get_osdmap()->test_flag(CEPH_OSDMAP_NOSNAPTRIM);
}
} snap_trimmer_machine;
: my_base(ctx),
NamedState(context< SnapTrimmer >().pg, "Trimming") {
context< SnapTrimmer >().log_enter(state_name);
- assert(context< SnapTrimmer >().can_trim());
- assert(in_flight.empty());
+ ceph_assert(context< SnapTrimmer >().can_trim());
+ ceph_assert(in_flight.empty());
}
void exit() {
context< SnapTrimmer >().log_exit(state_name, enter_time);
: my_base(ctx),
NamedState(context< SnapTrimmer >().pg, "Trimming/WaitTrimTimer") {
context< SnapTrimmer >().log_enter(state_name);
- assert(context<Trimming>().in_flight.empty());
+ ceph_assert(context<Trimming>().in_flight.empty());
struct OnTimer : Context {
PrimaryLogPGRef pg;
epoch_t epoch;
};
auto *pg = context< SnapTrimmer >().pg;
if (pg->cct->_conf->osd_snap_trim_sleep > 0) {
- Mutex::Locker l(pg->osd->snap_sleep_lock);
- wakeup = pg->osd->snap_sleep_timer.add_event_after(
+ std::lock_guard l(pg->osd->sleep_lock);
+ wakeup = pg->osd->sleep_timer.add_event_after(
pg->cct->_conf->osd_snap_trim_sleep,
- new OnTimer{pg, pg->get_osdmap()->get_epoch()});
+ new OnTimer{pg, pg->get_osdmap_epoch()});
} else {
post_event(SnapTrimTimerReady());
}
context< SnapTrimmer >().log_exit(state_name, enter_time);
auto *pg = context< SnapTrimmer >().pg;
if (wakeup) {
- Mutex::Locker l(pg->osd->snap_sleep_lock);
- pg->osd->snap_sleep_timer.cancel_event(wakeup);
+ std::lock_guard l(pg->osd->sleep_lock);
+ pg->osd->sleep_timer.cancel_event(wakeup);
wakeup = nullptr;
}
}
: my_base(ctx),
NamedState(context< SnapTrimmer >().pg, "Trimming/WaitRWLock") {
context< SnapTrimmer >().log_enter(state_name);
- assert(context<Trimming>().in_flight.empty());
+ ceph_assert(context<Trimming>().in_flight.empty());
}
void exit() {
context< SnapTrimmer >().log_exit(state_name, enter_time);
: my_base(ctx),
NamedState(context< SnapTrimmer >().pg, "Trimming/WaitRepops") {
context< SnapTrimmer >().log_enter(state_name);
- assert(!context<Trimming>().in_flight.empty());
+ ceph_assert(!context<Trimming>().in_flight.empty());
}
void exit() {
context< SnapTrimmer >().log_exit(state_name, enter_time);
struct ReservationCB : public Context {
PrimaryLogPGRef pg;
bool canceled;
- ReservationCB(PrimaryLogPG *pg) : pg(pg), canceled(false) {}
+ explicit ReservationCB(PrimaryLogPG *pg) : pg(pg), canceled(false) {}
void finish(int) override {
pg->lock();
if (!canceled)
pg->unlock();
}
void cancel() {
- assert(pg->is_locked());
- assert(!canceled);
+ ceph_assert(pg->is_locked());
+ ceph_assert(!canceled);
canceled = true;
}
};
: my_base(ctx),
NamedState(context< SnapTrimmer >().pg, "Trimming/WaitReservation") {
context< SnapTrimmer >().log_enter(state_name);
- assert(context<Trimming>().in_flight.empty());
+ ceph_assert(context<Trimming>().in_flight.empty());
auto *pg = context< SnapTrimmer >().pg;
pending = new ReservationCB(pg);
pg->osd->snap_reserver.request_reservation(
void wait_for_all_missing(OpRequestRef op);
bool is_degraded_or_backfilling_object(const hobject_t& oid);
+ bool is_degraded_on_async_recovery_target(const hobject_t& soid);
void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op);
void block_write_on_full_cache(
const hobject_t& oid, ObjectContextRef obc, OpRequestRef op);
void block_write_on_degraded_snap(const hobject_t& oid, OpRequestRef op);
- bool maybe_await_blocked_snapset(const hobject_t &soid, OpRequestRef op);
+ bool maybe_await_blocked_head(const hobject_t &soid, OpRequestRef op);
void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
void kick_object_context_blocked(ObjectContextRef obc);
void on_flushed() override;
void on_removal(ObjectStore::Transaction *t) override;
void on_shutdown() override;
- bool check_failsafe_full(ostream &ss) override;
+ bool check_failsafe_full() override;
bool check_osdmap_full(const set<pg_shard_t> &missing_on) override;
bool maybe_preempt_replica_scrub(const hobject_t& oid) override {
return write_blocked_by_scrub(oid);
}
- int rep_repair_primary_object(const hobject_t& soid, OpRequestRef op);
+ int rep_repair_primary_object(const hobject_t& soid, OpContext *ctx);
// attr cache handling
void setattr_maybe_cache(
ObjectContextRef obc,
- OpContext *op,
PGTransaction *t,
const string &key,
bufferlist &val);
void setattrs_maybe_cache(
ObjectContextRef obc,
- OpContext *op,
PGTransaction *t,
map<string, bufferlist> &attrs);
void rmattr_maybe_cache(
ObjectContextRef obc,
- OpContext *op,
PGTransaction *t,
const string &key);
int getattr_maybe_cache(
int getattrs_maybe_cache(
ObjectContextRef obc,
map<string, bufferlist> *out);
+
+public:
+ void set_dynamic_perf_stats_queries(
+ const std::list<OSDPerfMetricQuery> &queries) override;
+ void get_dynamic_perf_stats(DynamicPerfStats *stats) override;
+
+private:
+ DynamicPerfStats m_dynamic_perf_stats;
};
inline ostream& operator<<(ostream& out, const PrimaryLogPG::RepGather& repop)
<< " " << repop.v
<< " rep_tid=" << repop.rep_tid
<< " committed?=" << repop.all_committed
- << " applied?=" << repop.all_applied
<< " r=" << repop.r
<< ")";
return out;