-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
/*
* Ceph - scalable distributed file system
*
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
+ * License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
- *
+ *
*/
#ifndef CEPH_REPLICATEDPG_H
class CopyFromCallback;
class PromoteCallback;
+struct RefCountCallback;
class PrimaryLogPG;
class PGLSFilter;
class HitSet;
struct TierAgentState;
-class MOSDOp;
-class MOSDOpReply;
class OSDService;
void intrusive_ptr_add_ref(PrimaryLogPG *pg);
class PrimaryLogPG : public PG, public PGBackend::Listener {
friend class OSD;
friend class Watch;
+ friend class PrimaryLogScrub;
public:
MEMPOOL_CLASS_HELPERS();
version_t user_version; ///< The copy source's user version
bool should_requeue; ///< op should be requeued on cancel
- vector<snapid_t> snaps; ///< src's snaps (if clone)
+ std::vector<snapid_t> snaps; ///< src's snaps (if clone)
snapid_t snap_seq; ///< src's snap_seq (if head)
librados::snap_set_t snapset; ///< src snapset (if head)
bool mirror_snapset;
uint32_t flags; // object_copy_data_t::FLAG_*
uint32_t source_data_digest, source_omap_digest;
uint32_t data_digest, omap_digest;
- mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
- mempool::osd_pglog::map<uint32_t, int> reqid_return_codes; // map reqids by index to error code
- map<string, bufferlist> attrs; // xattrs
+ mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
+ mempool::osd_pglog::map<uint32_t, int> reqid_return_codes; // std::map reqids by index to error code
+ std::map<std::string, ceph::buffer::list, std::less<>> attrs; // xattrs
uint64_t truncate_seq;
uint64_t truncate_size;
bool is_data_digest() {
ceph_tid_t objecter_tid2;
object_copy_cursor_t cursor;
- map<string,bufferlist> attrs;
- bufferlist data;
- bufferlist omap_header;
- bufferlist omap_data;
+ std::map<std::string,ceph::buffer::list,std::less<>> attrs;
+ ceph::buffer::list data;
+ ceph::buffer::list omap_header;
+ ceph::buffer::list omap_data;
int rval;
object_copy_cursor_t temp_cursor;
unsigned src_obj_fadvise_flags;
unsigned dest_obj_fadvise_flags;
- map<uint64_t, CopyOpRef> chunk_cops;
+ std::map<uint64_t, CopyOpRef> chunk_cops;
int num_chunk;
bool failed;
uint64_t start_offset = 0;
uint64_t last_offset = 0;
- vector<OSDOp> chunk_ops;
-
+ std::vector<OSDOp> chunk_ops;
+
CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s,
object_locator_t l,
version_t v,
typedef boost::tuple<int, CopyResults*> CopyCallbackResults;
friend class CopyFromCallback;
- friend class CopyFromFinisher;
+ friend struct CopyFromFinisher;
friend class PromoteCallback;
- friend class PromoteFinisher;
-
+ friend struct PromoteFinisher;
+ friend struct C_gather;
+
struct ProxyReadOp {
OpRequestRef op;
hobject_t soid;
ceph_tid_t objecter_tid;
- vector<OSDOp> &ops;
+ std::vector<OSDOp> &ops;
version_t user_version;
int data_offset;
bool canceled; ///< true if canceled
- ProxyReadOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops)
+ ProxyReadOp(OpRequestRef _op, hobject_t oid, std::vector<OSDOp>& _ops)
: op(_op), soid(oid),
objecter_tid(0), ops(_ops),
user_version(0), data_offset(0),
OpRequestRef op;
hobject_t soid;
ceph_tid_t objecter_tid;
- vector<OSDOp> &ops;
+ std::vector<OSDOp> &ops;
version_t user_version;
bool sent_reply;
utime_t mtime;
bool canceled;
osd_reqid_t reqid;
- ProxyWriteOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops, osd_reqid_t _reqid)
+ ProxyWriteOp(OpRequestRef _op, hobject_t oid, std::vector<OSDOp>& _ops, osd_reqid_t _reqid)
: ctx(NULL), op(_op), soid(oid),
objecter_tid(0), ops(_ops),
user_version(0), sent_reply(false),
struct FlushOp {
ObjectContextRef obc; ///< obc we are flushing
OpRequestRef op; ///< initiating op
- list<OpRequestRef> dup_ops; ///< bandwagon jumpers
+ std::list<OpRequestRef> dup_ops; ///< bandwagon jumpers
version_t flushed_version; ///< user version we are flushing
ceph_tid_t objecter_tid; ///< copy-from request tid
int rval; ///< copy-from result
bool blocking; ///< whether we are blocking updates
bool removal; ///< we are removing the backend object
- boost::optional<std::function<void()>> on_flush; ///< callback, may be null
+ std::optional<std::function<void()>> on_flush; ///< callback, may be null
// for chunked object
- map<uint64_t, int> io_results;
- map<uint64_t, ceph_tid_t> io_tids;
+ std::map<uint64_t, int> io_results;
+ std::map<uint64_t, ceph_tid_t> io_tids;
uint64_t chunks;
FlushOp()
};
typedef std::shared_ptr<FlushOp> FlushOpRef;
+ struct CLSGatherOp {
+ OpContext *ctx = nullptr;
+ ObjectContextRef obc;
+ OpRequestRef op;
+ std::vector<ceph_tid_t> objecter_tids;
+ int rval = 0;
+
+ CLSGatherOp(OpContext *ctx_, ObjectContextRef obc_, OpRequestRef op_)
+ : ctx(ctx_), obc(obc_), op(op_) {}
+ CLSGatherOp() {}
+ ~CLSGatherOp() {}
+ };
+
+ friend struct RefCountCallback;
+ struct ManifestOp {
+ RefCountCallback *cb = nullptr;
+ ceph_tid_t objecter_tid = 0;
+ OpRequestRef op;
+ std::map<uint64_t, int> results;
+ std::map<uint64_t, ceph_tid_t> tids;
+ std::map<hobject_t, std::pair<uint64_t, uint64_t>> chunks;
+ uint64_t num_chunks = 0;
+ object_manifest_t new_manifest;
+ ObjectContextRef obc;
+
+
+ ManifestOp(ObjectContextRef obc, RefCountCallback* cb)
+ : cb(cb), obc(obc) {}
+ ManifestOp() = delete;
+ };
+ typedef std::shared_ptr<ManifestOp> ManifestOpRef;
+ std::map<hobject_t, ManifestOpRef> manifest_ops;
+
boost::scoped_ptr<PGBackend> pgbackend;
PGBackend *get_pgbackend() override {
return pgbackend.get();
pg_shard_t peer,
const hobject_t &oid,
const ObjectRecoveryInfo &recovery_info
- ) override;
+ ) override {
+ recovery_state.on_peer_recover(peer, oid, recovery_info.version);
+ }
void begin_peer_recover(
pg_shard_t peer,
- const hobject_t oid) override;
+ const hobject_t oid) override {
+ recovery_state.begin_peer_recover(peer, oid);
+ }
void on_global_recover(
const hobject_t &oid,
const object_stat_sum_t &stat_diff,
bool is_delete) override;
- void failed_push(const list<pg_shard_t> &from,
- const hobject_t &soid,
- const eversion_t &need = eversion_t()) override;
- void primary_failed(const hobject_t &soid) override;
- bool primary_error(const hobject_t& soid, eversion_t v) override;
+ void on_failed_pull(
+ const std::set<pg_shard_t> &from,
+ const hobject_t &soid,
+ const eversion_t &version) override;
void cancel_pull(const hobject_t &soid) override;
void apply_stats(
const hobject_t &soid,
const object_stat_sum_t &delta_stats) override;
- void on_primary_error(const hobject_t &oid, eversion_t v) override;
- void backfill_add_missing(const hobject_t &oid, eversion_t v) override;
+
+ bool primary_error(const hobject_t& soid, eversion_t v);
+
void remove_missing_object(const hobject_t &oid,
eversion_t v,
Context *on_complete) override;
GenContext<ThreadPool::TPHandle&> *c) override;
GenContext<ThreadPool::TPHandle&> *bless_unlocked_gencontext(
GenContext<ThreadPool::TPHandle&> *c) override;
-
+
void send_message(int to_osd, Message *m) override {
osd->send_message_osd_cluster(to_osd, m, get_osdmap_epoch());
}
OpRequestRef op) override {
osd->store->queue_transaction(ch, std::move(t), op);
}
- void queue_transactions(vector<ObjectStore::Transaction>& tls,
+ void queue_transactions(std::vector<ObjectStore::Transaction>& tls,
OpRequestRef op) override {
osd->store->queue_transactions(ch, tls, op, NULL);
}
epoch_t get_last_peering_reset_epoch() const override {
return get_last_peering_reset();
}
- const set<pg_shard_t> &get_acting_recovery_backfill_shards() const override {
- return acting_recovery_backfill;
+ const std::set<pg_shard_t> &get_acting_recovery_backfill_shards() const override {
+ return get_acting_recovery_backfill();
}
- const set<pg_shard_t> &get_acting_shards() const override {
- return actingset;
+ const std::set<pg_shard_t> &get_acting_shards() const override {
+ return recovery_state.get_actingset();
}
- const set<pg_shard_t> &get_backfill_shards() const override {
- return backfill_targets;
+ const std::set<pg_shard_t> &get_backfill_shards() const override {
+ return get_backfill_targets();
}
std::ostream& gen_dbg_prefix(std::ostream& out) const override {
return gen_prefix(out);
}
- const map<hobject_t, set<pg_shard_t>>
- &get_missing_loc_shards() const override {
- return missing_loc.get_missing_locs();
+ const HobjToShardSetMapping& get_missing_loc_shards() const override
+ {
+ return recovery_state.get_missing_loc().get_missing_locs();
}
- const map<pg_shard_t, pg_missing_t> &get_shard_missing() const override {
- return peer_missing;
+ const std::map<pg_shard_t, pg_missing_t> &get_shard_missing() const override {
+ return recovery_state.get_peer_missing();
}
using PGBackend::Listener::get_shard_missing;
- const map<pg_shard_t, pg_info_t> &get_shard_info() const override {
- return peer_info;
+ const std::map<pg_shard_t, pg_info_t> &get_shard_info() const override {
+ return recovery_state.get_peer_info();
}
- using PGBackend::Listener::get_shard_info;
+ using PGBackend::Listener::get_shard_info;
const pg_missing_tracker_t &get_local_missing() const override {
- return pg_log.get_missing();
+ return recovery_state.get_pg_log().get_missing();
}
const PGLog &get_log() const override {
- return pg_log;
+ return recovery_state.get_pg_log();
}
void add_local_next_event(const pg_log_entry_t& e) override {
- pg_log.missing_add_next_entry(e);
+ recovery_state.add_local_next_event(e);
}
bool pgb_is_primary() const override {
return is_primary();
ObjectContextRef get_obc(
const hobject_t &hoid,
- const map<string, bufferlist> &attrs) override {
+ const std::map<std::string, ceph::buffer::list, std::less<>> &attrs) override {
return get_object_context(hoid, true, &attrs);
}
release_object_locks(manager);
}
- bool pg_is_repair() override {
- return is_repair();
- }
void inc_osd_stat_repaired() override {
osd->inc_osd_stat_repaired();
}
void pgb_set_object_snap_mapping(
const hobject_t &soid,
- const set<snapid_t> &snaps,
+ const std::set<snapid_t> &snaps,
ObjectStore::Transaction *t) override {
return update_object_snap_mapping(t, soid, snaps);
}
}
void log_operation(
- const vector<pg_log_entry_t> &logv,
- const boost::optional<pg_hit_set_history_t> &hset_history,
+ std::vector<pg_log_entry_t>&& logv,
+ const std::optional<pg_hit_set_history_t> &hset_history,
const eversion_t &trim_to,
const eversion_t &roll_forward_to,
+ const eversion_t &min_last_complete_ondisk,
bool transaction_applied,
ObjectStore::Transaction &t,
bool async = false) override {
+ if (is_primary()) {
+ ceph_assert(trim_to <= recovery_state.get_last_update_ondisk());
+ }
if (hset_history) {
- info.hit_set = *hset_history;
+ recovery_state.update_hset(*hset_history);
+ }
+ if (transaction_applied) {
+ update_snap_map(logv, t);
}
- append_log(logv, trim_to, roll_forward_to, t, transaction_applied, async);
+ auto last = logv.rbegin();
+ if (is_primary() && last != logv.rend()) {
+ projected_log.skip_can_rollback_to_to_head();
+ projected_log.trim(cct, last->version, nullptr, nullptr, nullptr);
+ }
+ if (!is_primary() && !is_ec_pg()) {
+ replica_clear_repop_obc(logv, t);
+ }
+ recovery_state.append_log(
+ std::move(logv), trim_to, roll_forward_to, min_last_complete_ondisk,
+ t, transaction_applied, async);
}
+ void replica_clear_repop_obc(
+ const std::vector<pg_log_entry_t> &logv,
+ ObjectStore::Transaction &t);
+
void op_applied(const eversion_t &applied_version) override;
bool should_send_op(
bool pg_is_undersized() const override {
return is_undersized();
}
-
+
bool pg_is_repair() const override {
return is_repair();
}
void update_peer_last_complete_ondisk(
pg_shard_t fromosd,
eversion_t lcod) override {
- peer_last_complete_ondisk[fromosd] = lcod;
+ recovery_state.update_peer_last_complete_ondisk(fromosd, lcod);
}
void update_last_complete_ondisk(
eversion_t lcod) override {
- last_complete_ondisk = lcod;
+ recovery_state.update_last_complete_ondisk(lcod);
}
void update_stats(
const pg_stat_t &stat) override {
- info.stats = stat;
+ recovery_state.update_stats(
+ [&stat](auto &history, auto &stats) {
+ stats = stat;
+ return false;
+ });
}
void schedule_recovery_work(
- GenContext<ThreadPool::TPHandle&> *c) override;
+ GenContext<ThreadPool::TPHandle&> *c,
+ uint64_t cost) override;
pg_shard_t whoami_shard() const override {
return pg_whoami;
}
spg_t primary_spg_t() const override {
- return spg_t(info.pgid.pgid, primary.shard);
+ return spg_t(info.pgid.pgid, get_primary().shard);
}
pg_shard_t primary_shard() const override {
- return primary;
+ return get_primary();
+ }
+ uint64_t min_peer_features() const override {
+ return recovery_state.get_min_peer_features();
+ }
+ uint64_t min_upacting_features() const override {
+ return recovery_state.get_min_upacting_features();
+ }
+ void send_message_osd_cluster(
+ int peer, Message *m, epoch_t from_epoch) override {
+ osd->send_message_osd_cluster(peer, m, from_epoch);
}
-
void send_message_osd_cluster(
- int peer, Message *m, epoch_t from_epoch) override;
+ std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch) override {
+ osd->send_message_osd_cluster(messages, from_epoch);
+ }
void send_message_osd_cluster(
- Message *m, Connection *con) override;
+ MessageRef m, Connection *con) override {
+ osd->send_message_osd_cluster(std::move(m), con);
+ }
void send_message_osd_cluster(
- Message *m, const ConnectionRef& con) override;
+ Message *m, const ConnectionRef& con) override {
+ osd->send_message_osd_cluster(m, con);
+ }
ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch) override;
entity_name_t get_cluster_msgr_name() override {
return osd->get_cluster_msgr_name();
ceph_tid_t get_tid() override { return osd->get_tid(); }
- LogClientTemp clog_error() override { return osd->clog->error(); }
- LogClientTemp clog_warn() override { return osd->clog->warn(); }
+ OstreamTemp clog_error() override { return osd->clog->error(); }
+ OstreamTemp clog_warn() override { return osd->clog->warn(); }
+
+ /**
+ * a scrub-map arrived from a replica
+ */
+ void do_replica_scrub_map(OpRequestRef op);
struct watch_disconnect_t {
uint64_t cookie;
};
void complete_disconnect_watches(
ObjectContextRef obc,
- const list<watch_disconnect_t> &to_disconnect);
+ const std::list<watch_disconnect_t> &to_disconnect);
struct OpFinisher {
virtual ~OpFinisher() {
struct OpContext {
OpRequestRef op;
osd_reqid_t reqid;
- vector<OSDOp> *ops;
+ std::vector<OSDOp> *ops;
const ObjectState *obs; // Old objectstate
const SnapSet *snapset; // Old snapset
bool modify; // (force) modification (even if op_t is empty)
bool user_modify; // user-visible modification
bool undirty; // user explicitly un-dirtying this object
- bool cache_evict; ///< true if this is a cache eviction
- bool ignore_cache; ///< true if IGNORE_CACHE flag is set
+ bool cache_operation; ///< true if this is a cache eviction
+ bool ignore_cache; ///< true if IGNORE_CACHE flag is std::set
bool ignore_log_op_stats; // don't log op stats
bool update_log_only; ///< this is a write that returned an error - just record in pg log for dup detection
+ ObjectCleanRegions clean_regions;
// side effects
- list<pair<watch_info_t,bool> > watch_connects; ///< new watch + will_ping flag
- list<watch_disconnect_t> watch_disconnects; ///< old watch + send_discon
- list<notify_info_t> notifies;
+ std::list<std::pair<watch_info_t,bool> > watch_connects; ///< new watch + will_ping flag
+ std::list<watch_disconnect_t> watch_disconnects; ///< old watch + send_discon
+ std::list<notify_info_t> notifies;
struct NotifyAck {
- boost::optional<uint64_t> watch_cookie;
+ std::optional<uint64_t> watch_cookie;
uint64_t notify_id;
- bufferlist reply_bl;
+ ceph::buffer::list reply_bl;
explicit NotifyAck(uint64_t notify_id) : notify_id(notify_id) {}
- NotifyAck(uint64_t notify_id, uint64_t cookie, bufferlist& rbl)
+ NotifyAck(uint64_t notify_id, uint64_t cookie, ceph::buffer::list& rbl)
: watch_cookie(cookie), notify_id(notify_id) {
- reply_bl.claim(rbl);
+ reply_bl = std::move(rbl);
}
};
- list<NotifyAck> notify_acks;
+ std::list<NotifyAck> notify_acks;
uint64_t bytes_written, bytes_read;
int processed_subop_count = 0;
PGTransactionUPtr op_t;
- vector<pg_log_entry_t> log;
- boost::optional<pg_hit_set_history_t> updated_hset_history;
+ std::vector<pg_log_entry_t> log;
+ std::optional<pg_hit_set_history_t> updated_hset_history;
interval_set<uint64_t> modified_ranges;
ObjectContextRef obc;
ObjectContextRef head_obc; // if we also update snapset (see trim_object)
// FIXME: we may want to kill this msgr hint off at some point!
- boost::optional<int> data_off = boost::none;
+ std::optional<int> data_off = std::nullopt;
MOSDOpReply *reply;
int num_read; ///< count read ops
int num_write; ///< count update ops
- mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > extra_reqids;
+ mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t> > extra_reqids;
mempool::osd_pglog::map<uint32_t, int> extra_reqid_return_codes;
hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking
- list<std::function<void()>> on_applied;
- list<std::function<void()>> on_committed;
- list<std::function<void()>> on_finish;
- list<std::function<void()>> on_success;
+ std::list<std::function<void()>> on_applied;
+ std::list<std::function<void()>> on_committed;
+ std::list<std::function<void()>> on_finish;
+ std::list<std::function<void()>> on_success;
template <typename F>
void register_on_finish(F &&f) {
on_finish.emplace_back(std::forward<F>(f));
bool sent_reply = false;
// pending async reads <off, len, op_flags> -> <outbl, outr>
- list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
- pair<bufferlist*, Context*> > > pending_async_reads;
+ std::list<std::pair<boost::tuple<uint64_t, uint64_t, unsigned>,
+ std::pair<ceph::buffer::list*, Context*> > > pending_async_reads;
int inflightreads;
friend struct OnReadComplete;
void start_async_reads(PrimaryLogPG *pg);
return inflightreads == 0;
}
- ObjectContext::RWState::State lock_type;
+ RWState::State lock_type;
ObcLockManager lock_manager;
std::map<int, std::unique_ptr<OpFinisher>> op_finishers;
OpContext(const OpContext& other);
const OpContext& operator=(const OpContext& other);
- OpContext(OpRequestRef _op, osd_reqid_t _reqid, vector<OSDOp>* _ops,
+ OpContext(OpRequestRef _op, osd_reqid_t _reqid, std::vector<OSDOp>* _ops,
ObjectContextRef& obc,
PrimaryLogPG *_pg) :
op(_op), reqid(_reqid), ops(_ops),
obs(&obc->obs),
snapset(0),
new_obs(obs->oi, obs->exists),
- modify(false), user_modify(false), undirty(false), cache_evict(false),
+ modify(false), user_modify(false), undirty(false), cache_operation(false),
ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
bytes_written(0), bytes_read(0), user_at_version(0),
current_osd_subop_num(0),
num_write(0),
sent_reply(false),
inflightreads(0),
- lock_type(ObjectContext::RWState::RWNONE) {
+ lock_type(RWState::RWNONE) {
if (obc->ssc) {
new_snapset = obc->ssc->snapset;
snapset = &obc->ssc->snapset;
}
}
OpContext(OpRequestRef _op, osd_reqid_t _reqid,
- vector<OSDOp>* _ops, PrimaryLogPG *_pg) :
+ std::vector<OSDOp>* _ops, PrimaryLogPG *_pg) :
op(_op), reqid(_reqid), ops(_ops), obs(NULL), snapset(0),
- modify(false), user_modify(false), undirty(false), cache_evict(false),
+ modify(false), user_modify(false), undirty(false), cache_operation(false),
ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
bytes_written(0), bytes_read(0), user_at_version(0),
current_osd_subop_num(0),
num_read(0),
num_write(0),
inflightreads(0),
- lock_type(ObjectContext::RWState::RWNONE) {}
+ lock_type(RWState::RWNONE) {}
void reset_obs(ObjectContextRef obc) {
new_obs = ObjectState(obc->obs.oi, obc->obs.exists);
if (obc->ssc) {
ceph_assert(!op_t);
if (reply)
reply->put();
- for (list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
- pair<bufferlist*, Context*> > >::iterator i =
+ for (std::list<std::pair<boost::tuple<uint64_t, uint64_t, unsigned>,
+ std::pair<ceph::buffer::list*, Context*> > >::iterator i =
pending_async_reads.begin();
i != pending_async_reads.end();
pending_async_reads.erase(i++)) {
bool rep_aborted;
bool all_committed;
-
+
utime_t start;
-
+
eversion_t pg_local_last_complete;
ObcLockManager lock_manager;
- list<std::function<void()>> on_committed;
- list<std::function<void()>> on_success;
- list<std::function<void()>> on_finish;
-
+ std::list<std::function<void()>> on_committed;
+ std::list<std::function<void()>> on_success;
+ std::list<std::function<void()>> on_finish;
+
RepGather(
OpContext *c, ceph_tid_t rt,
eversion_t lc) :
op(c->op),
queue_item(this),
nref(1),
- rep_tid(rt),
+ rep_tid(rt),
rep_aborted(false),
all_committed(false),
pg_local_last_complete(lc),
RepGather(
ObcLockManager &&manager,
OpRequestRef &&o,
- boost::optional<std::function<void(void)> > &&on_complete,
+ std::optional<std::function<void(void)> > &&on_complete,
ceph_tid_t rt,
eversion_t lc,
int r) :
* @param ctx [in,out] ctx to get locks for
* @return true on success, false if we are queued
*/
- bool get_rw_locks(bool write_ordered, OpContext *ctx) {
- /* If head_obc, !obc->obs->exists and we will always take the
- * snapdir lock *before* the head lock. Since all callers will do
- * this (read or write) if we get the first we will be guaranteed
- * to get the second.
- */
- if (write_ordered && ctx->op->may_read()) {
- ctx->lock_type = ObjectContext::RWState::RWEXCL;
- } else if (write_ordered) {
- ctx->lock_type = ObjectContext::RWState::RWWRITE;
- } else {
- ceph_assert(ctx->op->may_read());
- ctx->lock_type = ObjectContext::RWState::RWREAD;
- }
-
- if (ctx->head_obc) {
- ceph_assert(!ctx->obc->obs.exists);
- if (!ctx->lock_manager.get_lock_type(
- ctx->lock_type,
- ctx->head_obc->obs.oi.soid,
- ctx->head_obc,
- ctx->op)) {
- ctx->lock_type = ObjectContext::RWState::RWNONE;
- return false;
- }
- }
- if (ctx->lock_manager.get_lock_type(
- ctx->lock_type,
- ctx->obc->obs.oi.soid,
- ctx->obc,
- ctx->op)) {
- return true;
- } else {
- ceph_assert(!ctx->head_obc);
- ctx->lock_type = ObjectContext::RWState::RWNONE;
- return false;
- }
- }
+ bool get_rw_locks(bool write_ordered, OpContext *ctx);
/**
* Cleans up OpContext
* Releases locks
*
* @param manager [in] manager with locks to release
+ *
+ * (moved to .cc due to scrubber access)
*/
- void release_object_locks(
- ObcLockManager &lock_manager) {
- list<pair<ObjectContextRef, list<OpRequestRef> > > to_req;
- bool requeue_recovery = false;
- bool requeue_snaptrim = false;
- lock_manager.put_locks(
- &to_req,
- &requeue_recovery,
- &requeue_snaptrim);
- if (requeue_recovery)
- queue_recovery();
- if (requeue_snaptrim)
- snap_trimmer_machine.process_event(TrimWriteUnblocked());
-
- if (!to_req.empty()) {
- // requeue at front of scrub blocking queue if we are blocked by scrub
- for (auto &&p: to_req) {
- if (write_blocked_by_scrub(p.first->obs.oi.soid.get_head())) {
- for (auto& op : p.second) {
- op->mark_delayed("waiting for scrub");
- }
-
- waiting_for_scrub.splice(
- waiting_for_scrub.begin(),
- p.second,
- p.second.begin(),
- p.second.end());
- } else {
- requeue_ops(p.second);
- }
- }
- }
- }
+ void release_object_locks(ObcLockManager &lock_manager);
// replica ops
// [primary|tail]
void issue_repop(RepGather *repop, OpContext *ctx);
RepGather *new_repop(
OpContext *ctx,
- ObjectContextRef obc,
ceph_tid_t rep_tid);
boost::intrusive_ptr<RepGather> new_repop(
eversion_t version,
int r,
ObcLockManager &&manager,
OpRequestRef &&op,
- boost::optional<std::function<void(void)> > &&on_complete);
+ std::optional<std::function<void(void)> > &&on_complete);
void remove_repop(RepGather *repop);
OpContextUPtr simple_opc_create(ObjectContextRef obc);
void submit_log_entries(
const mempool::osd_pglog::list<pg_log_entry_t> &entries,
ObcLockManager &&manager,
- boost::optional<std::function<void(void)> > &&on_complete,
+ std::optional<std::function<void(void)> > &&on_complete,
OpRequestRef op = OpRequestRef(),
int r = 0);
struct LogUpdateCtx {
boost::intrusive_ptr<RepGather> repop;
- set<pg_shard_t> waiting_on;
+ std::set<pg_shard_t> waiting_on;
};
void cancel_log_updates();
- map<ceph_tid_t, LogUpdateCtx> log_entry_update_waiting_on;
+ std::map<ceph_tid_t, LogUpdateCtx> log_entry_update_waiting_on;
// hot/cold tracking
/// true if we can send an ondisk/commit for v
bool already_complete(eversion_t v);
- /// true if we can send an ack for v
- bool already_ack(eversion_t v);
// projected object info
SharedLRU<hobject_t, ObjectContext> object_contexts;
- // map from oid.snapdir() to SnapSetContext *
- map<hobject_t, SnapSetContext*> snapset_contexts;
- Mutex snapset_contexts_lock;
+ // std::map from oid.snapdir() to SnapSetContext *
+ std::map<hobject_t, SnapSetContext*> snapset_contexts;
+ ceph::mutex snapset_contexts_lock =
+ ceph::make_mutex("PrimaryLogPG::snapset_contexts_lock");
// debug order that client ops are applied
- map<hobject_t, map<client_t, ceph_tid_t>> debug_op_order;
+ std::map<hobject_t, std::map<client_t, ceph_tid_t>> debug_op_order;
void populate_obc_watchers(ObjectContextRef obc);
- void check_blacklisted_obc_watchers(ObjectContextRef obc);
- void check_blacklisted_watchers() override;
- void get_watchers(list<obj_watch_item_t> *ls) override;
- void get_obc_watchers(ObjectContextRef obc, list<obj_watch_item_t> &pg_watchers);
+ void check_blocklisted_obc_watchers(ObjectContextRef obc);
+ void check_blocklisted_watchers() override;
+ void get_watchers(std::list<obj_watch_item_t> *ls) override;
+ void get_obc_watchers(ObjectContextRef obc, std::list<obj_watch_item_t> &pg_watchers);
public:
void handle_watch_timeout(WatchRef watch);
protected:
ObjectContextRef get_object_context(
const hobject_t& soid,
bool can_create,
- const map<string, bufferlist> *attrs = 0
+ const std::map<std::string, ceph::buffer::list, std::less<>> *attrs = 0
);
void context_registry_on_change();
SnapSetContext *get_snapset_context(
const hobject_t& oid,
bool can_create,
- const map<string, bufferlist> *attrs = 0,
+ const std::map<std::string, ceph::buffer::list, std::less<>> *attrs = 0,
bool oid_existed = true //indicate this oid whether exsited in backend
);
void register_snapset_context(SnapSetContext *ssc) {
_register_snapset_context(ssc);
}
void _register_snapset_context(SnapSetContext *ssc) {
- ceph_assert(snapset_contexts_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(snapset_contexts_lock));
if (!ssc->registered) {
ceph_assert(snapset_contexts.count(ssc->oid) == 0);
ssc->registered = true;
}
void put_snapset_context(SnapSetContext *ssc);
- map<hobject_t, ObjectContextRef> recovering;
+ std::map<hobject_t, ObjectContextRef> recovering;
/*
* Backfill
* - are not included in pg stats (yet)
* - have their stats in pending_backfill_updates on the primary
*/
- set<hobject_t> backfills_in_flight;
- map<hobject_t, pg_stat_t> pending_backfill_updates;
-
- void dump_recovery_info(Formatter *f) const override {
- f->open_array_section("backfill_targets");
- for (set<pg_shard_t>::const_iterator p = backfill_targets.begin();
- p != backfill_targets.end(); ++p)
- f->dump_stream("replica") << *p;
- f->close_section();
+ std::set<hobject_t> backfills_in_flight;
+ std::map<hobject_t, pg_stat_t> pending_backfill_updates;
+
+ void dump_recovery_info(ceph::Formatter *f) const override {
f->open_array_section("waiting_on_backfill");
- for (set<pg_shard_t>::const_iterator p = waiting_on_backfill.begin();
+ for (std::set<pg_shard_t>::const_iterator p = waiting_on_backfill.begin();
p != waiting_on_backfill.end(); ++p)
f->dump_stream("osd") << *p;
f->close_section();
}
{
f->open_array_section("peer_backfill_info");
- for (map<pg_shard_t, BackfillInterval>::const_iterator pbi =
+ for (std::map<pg_shard_t, BackfillInterval>::const_iterator pbi =
peer_backfill_info.begin();
pbi != peer_backfill_info.end(); ++pbi) {
f->dump_stream("osd") << pbi->first;
}
{
f->open_array_section("backfills_in_flight");
- for (set<hobject_t>::const_iterator i = backfills_in_flight.begin();
+ for (std::set<hobject_t>::const_iterator i = backfills_in_flight.begin();
i != backfills_in_flight.end();
++i) {
f->dump_stream("object") << *i;
}
{
f->open_array_section("recovering");
- for (map<hobject_t, ObjectContextRef>::const_iterator i = recovering.begin();
+ for (std::map<hobject_t, ObjectContextRef>::const_iterator i = recovering.begin();
i != recovering.end();
++i) {
f->dump_stream("object") << i->first;
PGBackend::RecoveryHandle *h,
bool *work_started);
- void finish_degraded_object(const hobject_t& oid) override;
+ void finish_degraded_object(const hobject_t oid);
// Cancels/resets pulls from peer
void check_recovery_sources(const OSDMapRef& map) override ;
void _make_clone(
OpContext *ctx,
PGTransaction* t,
- ObjectContextRef obc,
+ ObjectContextRef clone_obc,
const hobject_t& head, const hobject_t& coid,
object_info_t *poi);
void execute_ctx(OpContext *ctx);
- void finish_ctx(OpContext *ctx, int log_op_type);
+ void finish_ctx(OpContext *ctx, int log_op_type, int result=0);
void reply_ctx(OpContext *ctx, int err);
- void reply_ctx(OpContext *ctx, int err, eversion_t v, version_t uv);
void make_writeable(OpContext *ctx);
void log_op_stats(const OpRequest& op, uint64_t inb, uint64_t outb);
void do_cache_redirect(OpRequestRef op);
/**
* This function attempts to start a promote. Either it succeeds,
- * or places op on a wait list. If op is null, failure means that
+ * or places op on a wait std::list. If op is null, failure means that
* this is a noop. If a future user wants to be able to distinguish
* these cases, a return value should be added.
*/
);
int prepare_transaction(OpContext *ctx);
- list<pair<OpRequestRef, OpContext*> > in_progress_async_reads;
+ std::list<std::pair<OpRequestRef, OpContext*> > in_progress_async_reads;
void complete_read_ctx(int result, OpContext *ctx);
-
+
// pg on-disk content
void check_local() override;
hobject_t earliest_peer_backfill() const;
bool all_peer_done() const;
/**
- * @param work_started will be set to true if recover_backfill got anywhere
+ * @param work_started will be std::set to true if recover_backfill got anywhere
* @returns the number of operations started
*/
uint64_t recover_backfill(uint64_t max, ThreadPool::TPHandle &handle,
/**
* scan a (hash) range of objects in the current pg
*
- * @begin first item should be >= this value
* @min return at least this many items, unless we are done
* @max return no more than this many items
- * @bi [out] resulting map of objects to eversion_t's
+ * @bi.begin first item should be >= this value
+ * @bi [out] resulting std::map of objects to eversion_t's
*/
void scan_range(
int min, int max, BackfillInterval *bi,
int prep_backfill_object_push(
hobject_t oid, eversion_t v, ObjectContextRef obc,
- vector<pg_shard_t> peers,
+ std::vector<pg_shard_t> peers,
PGBackend::RecoveryHandle *h);
void send_remove_op(const hobject_t& oid, eversion_t v, pg_shard_t peer);
void recover_got(hobject_t oid, eversion_t v);
// -- copyfrom --
- map<hobject_t, CopyOpRef> copy_ops;
+ std::map<hobject_t, CopyOpRef> copy_ops;
- int do_copy_get(OpContext *ctx, bufferlist::const_iterator& bp, OSDOp& op,
+ int do_copy_get(OpContext *ctx, ceph::buffer::list::const_iterator& bp, OSDOp& op,
ObjectContextRef& obc);
int finish_copy_get();
void _copy_some(ObjectContextRef obc, CopyOpRef cop);
void finish_copyfrom(CopyFromCallback *cb);
void finish_promote(int r, CopyResults *results, ObjectContextRef obc);
- void cancel_copy(CopyOpRef cop, bool requeue, vector<ceph_tid_t> *tids);
- void cancel_copy_ops(bool requeue, vector<ceph_tid_t> *tids);
+ void cancel_copy(CopyOpRef cop, bool requeue, std::vector<ceph_tid_t> *tids);
+ void cancel_copy_ops(bool requeue, std::vector<ceph_tid_t> *tids);
friend struct C_Copyfrom;
// -- flush --
- map<hobject_t, FlushOpRef> flush_ops;
+ std::map<hobject_t, FlushOpRef> flush_ops;
/// start_flush takes ownership of on_flush iff ret == -EINPROGRESS
int start_flush(
OpRequestRef op, ObjectContextRef obc,
bool blocking, hobject_t *pmissing,
- boost::optional<std::function<void()>> &&on_flush);
+ std::optional<std::function<void()>> &&on_flush,
+ bool force_dedup = false);
void finish_flush(hobject_t oid, ceph_tid_t tid, int r);
int try_flush_mark_clean(FlushOpRef fop);
- void cancel_flush(FlushOpRef fop, bool requeue, vector<ceph_tid_t> *tids);
- void cancel_flush_ops(bool requeue, vector<ceph_tid_t> *tids);
+ void cancel_flush(FlushOpRef fop, bool requeue, std::vector<ceph_tid_t> *tids);
+ void cancel_flush_ops(bool requeue, std::vector<ceph_tid_t> *tids);
/// @return false if clone is has been evicted
bool is_present_clone(hobject_t coid);
friend struct C_Flush;
+ // -- cls_gather --
+ std::map<hobject_t, CLSGatherOp> cls_gather_ops;
+ void cancel_cls_gather(std::map<hobject_t,CLSGatherOp>::iterator iter, bool requeue, std::vector<ceph_tid_t> *tids);
+ void cancel_cls_gather_ops(bool requeue, std::vector<ceph_tid_t> *tids);
+
// -- scrub --
bool _range_available_for_scrub(
const hobject_t &begin, const hobject_t &end) override;
- void scrub_snapshot_metadata(
- ScrubMap &map,
- const std::map<hobject_t,
- pair<boost::optional<uint32_t>,
- boost::optional<uint32_t>>> &missing_digest) override;
- void _scrub_clear_state() override;
- void _scrub_finish() override;
- object_stat_collection_t scrub_cstat;
void _split_into(pg_t child_pgid, PG *child,
unsigned split_bits) override;
void apply_and_flush_repops(bool requeue);
- void calc_trim_to() override;
- void calc_trim_to_aggressive() override;
- int do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr);
- int do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr);
+ int do_xattr_cmp_u64(int op, uint64_t v1, ceph::buffer::list& xattr);
+ int do_xattr_cmp_str(int op, std::string& v1s, ceph::buffer::list& xattr);
// -- checksum --
- int do_checksum(OpContext *ctx, OSDOp& osd_op, bufferlist::const_iterator *bl_it);
+ int do_checksum(OpContext *ctx, OSDOp& osd_op, ceph::buffer::list::const_iterator *bl_it);
int finish_checksum(OSDOp& osd_op, Checksummer::CSumType csum_type,
- bufferlist::const_iterator *init_value_bl_it,
- const bufferlist &read_bl);
+ ceph::buffer::list::const_iterator *init_value_bl_it,
+ const ceph::buffer::list &read_bl);
- friend class C_ChecksumRead;
+ friend struct C_ChecksumRead;
int do_extent_cmp(OpContext *ctx, OSDOp& osd_op);
- int finish_extent_cmp(OSDOp& osd_op, const bufferlist &read_bl);
+ int finish_extent_cmp(OSDOp& osd_op, const ceph::buffer::list &read_bl);
- friend class C_ExtentCmpRead;
+ friend struct C_ExtentCmpRead;
int do_read(OpContext *ctx, OSDOp& osd_op);
int do_sparse_read(OpContext *ctx, OSDOp& osd_op);
int do_writesame(OpContext *ctx, OSDOp& osd_op);
- bool pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata);
- int get_pgls_filter(bufferlist::const_iterator& iter, PGLSFilter **pfilter);
+ bool pgls_filter(const PGLSFilter& filter, const hobject_t& sobj);
+
+ std::pair<int, std::unique_ptr<const PGLSFilter>> get_pgls_filter(
+ ceph::buffer::list::const_iterator& iter);
- map<hobject_t, list<OpRequestRef>> in_progress_proxy_ops;
+ std::map<hobject_t, std::list<OpRequestRef>> in_progress_proxy_ops;
void kick_proxy_ops_blocked(hobject_t& soid);
- void cancel_proxy_ops(bool requeue, vector<ceph_tid_t> *tids);
+ void cancel_proxy_ops(bool requeue, std::vector<ceph_tid_t> *tids);
// -- proxyread --
- map<ceph_tid_t, ProxyReadOpRef> proxyread_ops;
+ std::map<ceph_tid_t, ProxyReadOpRef> proxyread_ops;
void do_proxy_read(OpRequestRef op, ObjectContextRef obc = NULL);
void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r);
- void cancel_proxy_read(ProxyReadOpRef prdop, vector<ceph_tid_t> *tids);
+ void cancel_proxy_read(ProxyReadOpRef prdop, std::vector<ceph_tid_t> *tids);
friend struct C_ProxyRead;
// -- proxywrite --
- map<ceph_tid_t, ProxyWriteOpRef> proxywrite_ops;
+ std::map<ceph_tid_t, ProxyWriteOpRef> proxywrite_ops;
void do_proxy_write(OpRequestRef op, ObjectContextRef obc = NULL);
void finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r);
- void cancel_proxy_write(ProxyWriteOpRef pwop, vector<ceph_tid_t> *tids);
+ void cancel_proxy_write(ProxyWriteOpRef pwop, std::vector<ceph_tid_t> *tids);
friend struct C_ProxyWrite_Commit;
// -- chunkop --
- void do_proxy_chunked_op(OpRequestRef op, const hobject_t& missing_oid,
+ enum class refcount_t {
+ INCREMENT_REF,
+ DECREMENT_REF,
+ CREATE_OR_GET_REF,
+ };
+ void do_proxy_chunked_op(OpRequestRef op, const hobject_t& missing_oid,
ObjectContextRef obc, bool write_ordered);
void do_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc, int op_index,
uint64_t chunk_index, uint64_t req_offset, uint64_t req_length,
void process_copy_chunk_manifest(hobject_t oid, ceph_tid_t tid, int r, uint64_t offset);
void finish_promote_manifest(int r, CopyResults *results, ObjectContextRef obc);
void cancel_and_requeue_proxy_ops(hobject_t oid);
- int do_manifest_flush(OpRequestRef op, ObjectContextRef obc, FlushOpRef manifest_fop,
- uint64_t start_offset, bool block);
- int start_manifest_flush(OpRequestRef op, ObjectContextRef obc, bool blocking,
- boost::optional<std::function<void()>> &&on_flush);
- void finish_manifest_flush(hobject_t oid, ceph_tid_t tid, int r, ObjectContextRef obc,
- uint64_t last_offset);
- void handle_manifest_flush(hobject_t oid, ceph_tid_t tid, int r,
- uint64_t offset, uint64_t last_offset, epoch_t lpr);
- void refcount_manifest(ObjectContextRef obc, object_locator_t oloc, hobject_t soid,
- SnapContext snapc, bool get, Context *cb, uint64_t offset);
+ void cancel_manifest_ops(bool requeue, std::vector<ceph_tid_t> *tids);
+ ceph_tid_t refcount_manifest(hobject_t src_soid, hobject_t tgt_soid, refcount_t type,
+ Context *cb, std::optional<bufferlist> chunk);
+ void dec_all_refcount_manifest(const object_info_t& oi, OpContext* ctx);
+ void dec_refcount(const hobject_t& soid, const object_ref_delta_t& refs);
+ void update_chunk_map_by_dirty(OpContext* ctx);
+ void dec_refcount_by_dirty(OpContext* ctx);
+ ObjectContextRef get_prev_clone_obc(ObjectContextRef obc);
+ bool recover_adjacent_clones(ObjectContextRef obc, OpRequestRef op);
+ void get_adjacent_clones(ObjectContextRef src_obc,
+ ObjectContextRef& _l, ObjectContextRef& _g);
+ bool inc_refcount_by_set(OpContext* ctx, object_manifest_t& tgt,
+ OSDOp& osd_op);
+ int do_cdc(const object_info_t& oi, std::map<uint64_t, chunk_info_t>& chunk_map,
+ std::map<uint64_t, bufferlist>& chunks);
+ int start_dedup(OpRequestRef op, ObjectContextRef obc);
+ std::pair<int, hobject_t> get_fpoid_from_chunk(const hobject_t soid, bufferlist& chunk);
+ int finish_set_dedup(hobject_t oid, int r, ceph_tid_t tid, uint64_t offset);
+ int finish_set_manifest_refcount(hobject_t oid, int r, ceph_tid_t tid, uint64_t offset);
friend struct C_ProxyChunkRead;
friend class PromoteManifestCallback;
- friend class C_CopyChunk;
- friend struct C_ManifestFlush;
+ friend struct C_CopyChunk;
friend struct RefCountCallback;
+ friend struct C_SetDedupChunks;
+ friend struct C_SetManifestRefCountDone;
+ friend struct SetManifestFinisher;
public:
PrimaryLogPG(OSDService *o, OSDMapRef curmap,
const PGPool &_pool,
- const map<string,string>& ec_profile,
+ const std::map<std::string,std::string>& ec_profile,
spg_t p);
- ~PrimaryLogPG() override {}
-
- int do_command(
- cmdmap_t cmdmap,
- ostream& ss,
- bufferlist& idata,
- bufferlist& odata,
- ConnectionRef conn,
- ceph_tid_t tid) override;
-
- void clear_cache();
- int get_cache_obj_count() {
+ ~PrimaryLogPG() override;
+
+ void do_command(
+ const std::string_view& prefix,
+ const cmdmap_t& cmdmap,
+ const ceph::buffer::list& idata,
+ std::function<void(int,const std::string&,ceph::buffer::list&)> on_finish) override;
+
+ void clear_cache() override;
+ int get_cache_obj_count() override {
return object_contexts.get_count();
}
+ unsigned get_pg_shard() const {
+ return info.pgid.hash_to_shard(osd->get_num_shards());
+ }
void do_request(
OpRequestRef& op,
ThreadPool::TPHandle &handle) override;
void do_op(OpRequestRef& op);
void record_write_error(OpRequestRef op, const hobject_t &soid,
- MOSDOpReply *orig_reply, int r);
+ MOSDOpReply *orig_reply, int r,
+ OpContext *ctx_for_op_returns=nullptr);
void do_pg_op(OpRequestRef op);
void do_scan(
OpRequestRef op,
void handle_backoff(OpRequestRef& op);
- int trim_object(bool first, const hobject_t &coid, OpContextUPtr *ctxp);
+ int trim_object(bool first, const hobject_t &coid, snapid_t snap_to_trim,
+ OpContextUPtr *ctxp);
void snap_trimmer(epoch_t e) override;
void kick_snap_trim() override;
void snap_trimmer_scrub_complete() override;
- int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops);
+ int do_osd_ops(OpContext *ctx, std::vector<OSDOp>& ops);
- int _get_tmap(OpContext *ctx, bufferlist *header, bufferlist *vals);
+ int _get_tmap(OpContext *ctx, ceph::buffer::list *header, ceph::buffer::list *vals);
int do_tmap2omap(OpContext *ctx, unsigned flags);
- int do_tmapup(OpContext *ctx, bufferlist::const_iterator& bp, OSDOp& osd_op);
- int do_tmapup_slow(OpContext *ctx, bufferlist::const_iterator& bp, OSDOp& osd_op, bufferlist& bl);
+ int do_tmapup(OpContext *ctx, ceph::buffer::list::const_iterator& bp, OSDOp& osd_op);
+ int do_tmapup_slow(OpContext *ctx, ceph::buffer::list::const_iterator& bp, OSDOp& osd_op, ceph::buffer::list& bl);
void do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn);
+ int start_cls_gather(OpContext *ctx, std::map<std::string, bufferlist> *src_objs, const std::string& pool,
+ const char *cls, const char *method, bufferlist& inbl);
+
private:
- int do_scrub_ls(MOSDOp *op, OSDOp *osd_op);
- hobject_t earliest_backfill() const;
+ int do_scrub_ls(const MOSDOp *op, OSDOp *osd_op);
bool check_src_targ(const hobject_t& soid, const hobject_t& toid) const;
uint64_t temp_seq; ///< last id for naming temp objects
/// generate a new temp object name (for recovery)
hobject_t get_temp_recovery_object(const hobject_t& target,
eversion_t version) override;
- int get_recovery_op_priority() const {
- int64_t pri = 0;
- pool.info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
- return pri > 0 ? pri : cct->_conf->osd_recovery_op_priority;
- }
- void log_missing(unsigned missing,
- const boost::optional<hobject_t> &head,
- LogChannelRef clog,
- const spg_t &pgid,
- const char *func,
- const char *mode,
- bool allow_incomplete_clones);
- unsigned process_clones_to(const boost::optional<hobject_t> &head,
- const boost::optional<SnapSet> &snapset,
- LogChannelRef clog,
- const spg_t &pgid,
- const char *mode,
- bool allow_incomplete_clones,
- boost::optional<snapid_t> target,
- vector<snapid_t>::reverse_iterator *curclone,
- inconsistent_snapset_wrapper &snap_error);
-
public:
coll_t get_coll() {
return coll;
int split_bits,
int seed,
const pg_pool_t *pool,
- ObjectStore::Transaction *t) override {
+ ObjectStore::Transaction &t) override {
coll_t target = coll_t(child);
- PG::_create(*t, child, split_bits);
- t->split_collection(
+ create_pg_collection(t, child, split_bits);
+ t.split_collection(
coll,
split_bits,
seed,
target);
- PG::_init(*t, child, pool);
+ init_pg_ondisk(t, child, pool);
}
private:
explicit SnapTrimmer(PrimaryLogPG *pg) : pg(pg) {}
void log_enter(const char *state_name);
void log_exit(const char *state_name, utime_t duration);
- bool permit_trim() {
- return
- pg->is_clean() &&
- !pg->scrubber.active &&
- !pg->snap_trimq.empty();
- }
+ bool permit_trim();
bool can_trim() {
return
permit_trim() &&
boost::statechart::transition< Reset, NotTrimming >
> reactions;
- set<hobject_t> in_flight;
+ std::set<hobject_t> in_flight;
snapid_t snap_to_trim;
explicit Trimming(my_context ctx)
: my_base(ctx),
- NamedState(context< SnapTrimmer >().pg, "Trimming") {
+ NamedState(nullptr, "Trimming") {
context< SnapTrimmer >().log_enter(state_name);
ceph_assert(context< SnapTrimmer >().permit_trim());
ceph_assert(in_flight.empty());
Context *wakeup = nullptr;
explicit WaitTrimTimer(my_context ctx)
: my_base(ctx),
- NamedState(context< SnapTrimmer >().pg, "Trimming/WaitTrimTimer") {
+ NamedState(nullptr, "Trimming/WaitTrimTimer") {
context< SnapTrimmer >().log_enter(state_name);
ceph_assert(context<Trimming>().in_flight.empty());
struct OnTimer : Context {
}
};
auto *pg = context< SnapTrimmer >().pg;
- if (pg->cct->_conf->osd_snap_trim_sleep > 0) {
+ float osd_snap_trim_sleep = pg->osd->osd->get_osd_snap_trim_sleep();
+ if (osd_snap_trim_sleep > 0) {
std::lock_guard l(pg->osd->sleep_lock);
wakeup = pg->osd->sleep_timer.add_event_after(
- pg->cct->_conf->osd_snap_trim_sleep,
+ osd_snap_trim_sleep,
new OnTimer{pg, pg->get_osdmap_epoch()});
} else {
post_event(SnapTrimTimerReady());
> reactions;
explicit WaitRWLock(my_context ctx)
: my_base(ctx),
- NamedState(context< SnapTrimmer >().pg, "Trimming/WaitRWLock") {
+ NamedState(nullptr, "Trimming/WaitRWLock") {
context< SnapTrimmer >().log_enter(state_name);
ceph_assert(context<Trimming>().in_flight.empty());
}
> reactions;
explicit WaitRepops(my_context ctx)
: my_base(ctx),
- NamedState(context< SnapTrimmer >().pg, "Trimming/WaitRepops") {
+ NamedState(nullptr, "Trimming/WaitRepops") {
context< SnapTrimmer >().log_enter(state_name);
ceph_assert(!context<Trimming>().in_flight.empty());
}
explicit WaitReservation(my_context ctx)
: my_base(ctx),
- NamedState(context< SnapTrimmer >().pg, "Trimming/WaitReservation") {
+ NamedState(nullptr, "Trimming/WaitReservation") {
context< SnapTrimmer >().log_enter(state_name);
ceph_assert(context<Trimming>().in_flight.empty());
auto *pg = context< SnapTrimmer >().pg;
> reactions;
explicit WaitScrub(my_context ctx)
: my_base(ctx),
- NamedState(context< SnapTrimmer >().pg, "Trimming/WaitScrub") {
+ NamedState(nullptr, "WaitScrub") {
context< SnapTrimmer >().log_enter(state_name);
}
void exit() {
// whiteout or no change.
void maybe_create_new_object(OpContext *ctx, bool ignore_transaction=false);
int _delete_oid(OpContext *ctx, bool no_whiteout, bool try_no_whiteout);
- int _rollback_to(OpContext *ctx, ceph_osd_op& op);
+ int _rollback_to(OpContext *ctx, OSDOp& op);
+ void _do_rollback_to(OpContext *ctx, ObjectContextRef rollback_to,
+ OSDOp& op);
public:
bool is_missing_object(const hobject_t& oid) const;
bool is_unreadable_object(const hobject_t &oid) const {
return is_missing_object(oid) ||
- !missing_loc.readable_with_acting(oid, actingset);
+ !recovery_state.get_missing_loc().readable_with_acting(
+ oid, get_actingset());
}
void maybe_kick_recovery(const hobject_t &soid);
void wait_for_unreadable_object(const hobject_t& oid, OpRequestRef op);
- void wait_for_all_missing(OpRequestRef op);
+ int get_manifest_ref_count(ObjectContextRef obc, std::string& fp_oid, OpRequestRef op);
+
+ bool check_laggy(OpRequestRef& op);
+ bool check_laggy_requeue(OpRequestRef& op);
+ void recheck_readable() override;
+
+ bool is_backfill_target(pg_shard_t osd) const {
+ return recovery_state.is_backfill_target(osd);
+ }
+ const std::set<pg_shard_t> &get_backfill_targets() const {
+ return recovery_state.get_backfill_targets();
+ }
+ bool is_async_recovery_target(pg_shard_t peer) const {
+ return recovery_state.is_async_recovery_target(peer);
+ }
+ const std::set<pg_shard_t> &get_async_recovery_targets() const {
+ return recovery_state.get_async_recovery_targets();
+ }
bool is_degraded_or_backfilling_object(const hobject_t& oid);
bool is_degraded_on_async_recovery_target(const hobject_t& soid);
void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op);
bool maybe_await_blocked_head(const hobject_t &soid, OpRequestRef op);
void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
void kick_object_context_blocked(ObjectContextRef obc);
+ void requeue_op_blocked_by_object(const hobject_t &soid);
void maybe_force_recovery();
void mark_all_unfound_lost(
int what,
- ConnectionRef con,
- ceph_tid_t tid);
+ std::function<void(int,const std::string&,ceph::buffer::list&)> on_finish);
eversion_t pick_newest_available(const hobject_t& oid);
void do_update_log_missing(
void do_update_log_missing_reply(
OpRequestRef &op);
- void on_role_change() override;
- void on_pool_change() override;
- void _on_new_interval() override;
+ void plpg_on_role_change() override;
+ void plpg_on_pool_change() override;
void clear_async_reads();
- void on_change(ObjectStore::Transaction *t) override;
- void on_activate() override;
+ void on_change(ObjectStore::Transaction &t) override;
+ void on_activate_complete() override;
void on_flushed() override;
- void on_removal(ObjectStore::Transaction *t) override;
+ void on_removal(ObjectStore::Transaction &t) override;
void on_shutdown() override;
bool check_failsafe_full() override;
- bool check_osdmap_full(const set<pg_shard_t> &missing_on) override;
- bool maybe_preempt_replica_scrub(const hobject_t& oid) override {
- return write_blocked_by_scrub(oid);
- }
+ bool maybe_preempt_replica_scrub(const hobject_t& oid) override;
int rep_repair_primary_object(const hobject_t& soid, OpContext *ctx);
// attr cache handling
void setattr_maybe_cache(
ObjectContextRef obc,
PGTransaction *t,
- const string &key,
- bufferlist &val);
+ const std::string &key,
+ ceph::buffer::list &val);
void setattrs_maybe_cache(
ObjectContextRef obc,
PGTransaction *t,
- map<string, bufferlist> &attrs);
+ std::map<std::string, ceph::buffer::list, std::less<>> &attrs);
void rmattr_maybe_cache(
ObjectContextRef obc,
PGTransaction *t,
- const string &key);
+ const std::string &key);
+ /**
+ * getattr_maybe_cache
+ *
+ * Populates val (if non-null) with the value of the attr with the specified key.
+ * Returns -ENOENT if object does not exist, -ENODATA if the object exists,
+ * but the specified key does not.
+ */
int getattr_maybe_cache(
ObjectContextRef obc,
- const string &key,
- bufferlist *val);
+ const std::string &key,
+ ceph::buffer::list *val);
int getattrs_maybe_cache(
ObjectContextRef obc,
- map<string, bufferlist> *out);
+ std::map<std::string, ceph::buffer::list, std::less<>> *out);
public:
void set_dynamic_perf_stats_queries(
{
out << "repgather(" << &repop
<< " " << repop.v
- << " rep_tid=" << repop.rep_tid
+ << " rep_tid=" << repop.rep_tid
<< " committed?=" << repop.all_committed
<< " r=" << repop.r
<< ")";