-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
#pragma once
#include <memory>
#include <optional>
-#include <boost/intrusive_ptr.hpp>
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include <boost/smart_ptr/local_shared_ptr.hpp>
#include <seastar/core/future.hh>
#include "common/dout.h"
#include "crimson/net/Fwd.h"
+#include "messages/MOSDRepOpReply.h"
+#include "messages/MOSDOpReply.h"
#include "os/Transaction.h"
#include "osd/osd_types.h"
#include "crimson/osd/object_context.h"
#include "osd/PeeringState.h"
+#include "crimson/common/interruptible_future.h"
#include "crimson/common/type_helpers.h"
#include "crimson/os/futurized_collection.h"
+#include "crimson/osd/backfill_state.h"
+#include "crimson/osd/pg_interval_interrupt_condition.h"
+#include "crimson/osd/ops_executer.h"
#include "crimson/osd/osd_operations/client_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_operations/replicated_request.h"
+#include "crimson/osd/osd_operations/background_recovery.h"
#include "crimson/osd/shard_services.h"
#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/pg_recovery.h"
+#include "crimson/osd/pg_recovery_listener.h"
+#include "crimson/osd/recovery_backend.h"
-class OSDMap;
class MQuery;
+class OSDMap;
class PGBackend;
class PGPeeringEvent;
+class osd_op_params_t;
+
namespace recovery {
class Context;
}
namespace crimson::osd {
class ClientRequest;
+class OpsExecuter;
class PG : public boost::intrusive_ref_counter<
PG,
boost::thread_unsafe_counter>,
+ public PGRecoveryListener,
PeeringState::PeeringListener,
DoutPrefixProvider
{
seastar::timer<seastar::lowres_clock> renew_lease_timer;
public:
+ template <typename T = void>
+ using interruptible_future =
+ ::crimson::interruptible::interruptible_future<
+ ::crimson::osd::IOInterruptCondition, T>;
+
PG(spg_t pgid,
pg_shard_t pg_shard,
crimson::os::CollectionRef coll_ref,
~PG();
- const pg_shard_t& get_pg_whoami() const {
+ const pg_shard_t& get_pg_whoami() const final {
return pg_whoami;
}
- const spg_t& get_pgid() const {
+ const spg_t& get_pgid() const final {
return pgid;
}
const PGBackend& get_backend() const {
return *backend;
}
-
// EpochSource
epoch_t get_osdmap_epoch() const final {
return peering_state.get_osdmap_epoch();
}
+ eversion_t get_pg_trim_to() const {
+ return peering_state.get_pg_trim_to();
+ }
+
+ eversion_t get_min_last_complete_ondisk() const {
+ return peering_state.get_min_last_complete_ondisk();
+ }
+
+ const pg_info_t& get_info() const final {
+ return peering_state.get_info();
+ }
+
// DoutPrefixProvider
std::ostream& gen_prefix(std::ostream& out) const final {
return out << *this;
// Not needed yet -- mainly for scrub scheduling
}
- void scrub_requested(bool deep, bool repair, bool need_auto = false) final {
- ceph_assert(0 == "Not implemented");
+ /// Notify PG that Primary/Replica status has changed (to update scrub registration)
+ void on_primary_status_change(bool was_primary, bool now_primary) final {
+ }
+
+ /// Need to reschedule next scrub. Assuming no change in role
+ void reschedule_scrub() final {
}
+ void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) final;
+
uint64_t get_snap_trimq_size() const final {
return 0;
}
void send_cluster_message(
- int osd, Message *m,
+ int osd, MessageURef m,
epoch_t epoch, bool share_map_update=false) final {
- (void)shard_services.send_to_osd(osd, m, epoch);
+ (void)shard_services.send_to_osd(osd, std::move(m), epoch);
}
void send_pg_created(pg_t pgid) final {
// will be needed for unblocking IO operations/peering
}
+ template <typename T>
+ void start_peering_event_operation(T &&evt, float delay = 0) {
+ (void) shard_services.start_operation<LocalPeeringEvent>(
+ this,
+ shard_services,
+ pg_whoami,
+ pgid,
+ delay,
+ std::forward<T>(evt));
+ }
+
void schedule_event_after(
PGPeeringEventRef event,
float delay) final {
- ceph_assert(0 == "Not implemented yet");
+ start_peering_event_operation(std::move(*event), delay);
+ }
+ std::vector<pg_shard_t> get_replica_recovery_order() const final {
+ return peering_state.get_replica_recovery_order();
}
-
void request_local_background_io_reservation(
unsigned priority,
- PGPeeringEventRef on_grant,
- PGPeeringEventRef on_preempt) final {
- ceph_assert(0 == "Not implemented yet");
+ PGPeeringEventURef on_grant,
+ PGPeeringEventURef on_preempt) final {
+ shard_services.local_reserver.request_reservation(
+ pgid,
+ on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
+ start_peering_event_operation(std::move(*on_grant));
+ }) : nullptr,
+ priority,
+ on_preempt ? make_lambda_context(
+ [this, on_preempt=std::move(on_preempt)] (int) {
+ start_peering_event_operation(std::move(*on_preempt));
+ }) : nullptr);
}
void update_local_background_io_priority(
unsigned priority) final {
- ceph_assert(0 == "Not implemented yet");
+ shard_services.local_reserver.update_priority(
+ pgid,
+ priority);
}
void cancel_local_background_io_reservation() final {
- // Not implemented yet, but gets called on exit() from some states
+ shard_services.local_reserver.cancel_reservation(
+ pgid);
}
void request_remote_recovery_reservation(
unsigned priority,
- PGPeeringEventRef on_grant,
- PGPeeringEventRef on_preempt) final {
- ceph_assert(0 == "Not implemented yet");
+ PGPeeringEventURef on_grant,
+ PGPeeringEventURef on_preempt) final {
+ shard_services.remote_reserver.request_reservation(
+ pgid,
+ on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
+ start_peering_event_operation(std::move(*on_grant));
+ }) : nullptr,
+ priority,
+ on_preempt ? make_lambda_context(
+ [this, on_preempt=std::move(on_preempt)] (int) {
+ start_peering_event_operation(std::move(*on_preempt));
+ }) : nullptr);
}
void cancel_remote_recovery_reservation() final {
- // Not implemented yet, but gets called on exit() from some states
+ shard_services.remote_reserver.cancel_reservation(
+ pgid);
}
void schedule_event_on_commit(
ceph::os::Transaction &t,
PGPeeringEventRef on_commit) final {
t.register_on_commit(
- new LambdaContext(
- [this, on_commit=std::move(on_commit)](int r){
- shard_services.start_operation<LocalPeeringEvent>(
- this,
- shard_services,
- pg_whoami,
- pgid,
- std::move(*on_commit));
+ make_lambda_context(
+ [this, on_commit=std::move(on_commit)](int) {
+ start_peering_event_operation(std::move(*on_commit));
}));
}
- void update_heartbeat_peers(set<int> peers) final {
+ void update_heartbeat_peers(std::set<int> peers) final {
// Not needed yet
}
- void set_probe_targets(const set<pg_shard_t> &probe_set) final {
+ void set_probe_targets(const std::set<pg_shard_t> &probe_set) final {
// Not needed yet
}
void clear_probe_targets() final {
void clear_want_pg_temp() final {
shard_services.remove_want_pg_temp(pgid.pgid);
}
- void publish_stats_to_osd() final {
- // Not needed yet
- }
- void clear_publish_stats() final {
- // Not needed yet
- }
void check_recovery_sources(const OSDMapRef& newmap) final {
// Not needed yet
}
- void check_blacklisted_watchers() final {
+ void check_blocklisted_watchers() final {
// Not needed yet
}
void clear_primary_state() final {
void on_role_change() final {
// Not needed yet
}
- void on_change(ceph::os::Transaction &t) final {
- // Not needed yet
- }
+ void on_change(ceph::os::Transaction &t) final;
void on_activate(interval_set<snapid_t> to_trim) final;
void on_activate_complete() final;
void on_new_interval() final {
void on_removal(ceph::os::Transaction &t) final {
// TODO
}
- ghobject_t do_delete_work(ceph::os::Transaction &t,
- ghobject_t _next) final;
+ std::pair<ghobject_t, bool>
+ do_delete_work(ceph::os::Transaction &t, ghobject_t _next) final;
// merge/split not ready
void clear_ready_to_merge() final {}
return 0;
}
-
void on_backfill_reserved() final {
- ceph_assert(0 == "Not implemented");
+ recovery_handler->on_backfill_reserved();
}
void on_backfill_canceled() final {
ceph_assert(0 == "Not implemented");
}
+
void on_recovery_reserved() final {
- ceph_assert(0 == "Not implemented");
+ recovery_handler->start_pglogbased_recovery();
}
bool try_reserve_recovery_space(
int64_t primary_num_bytes, int64_t local_num_bytes) final {
+ // TODO
return true;
}
void unreserve_recovery_space() final {}
// Utility
- bool is_primary() const {
+ bool is_primary() const final {
return peering_state.is_primary();
}
- pg_stat_t get_stats() {
- auto stats = peering_state.prepare_stats_for_publish(
- false,
- pg_stat_t(),
- object_stat_collection_t());
- ceph_assert(stats);
- return *stats;
+ bool is_nonprimary() const {
+ return peering_state.is_nonprimary();
+ }
+ bool is_peered() const final {
+ return peering_state.is_peered();
+ }
+ bool is_recovering() const final {
+ return peering_state.is_recovering();
+ }
+ bool is_backfilling() const final {
+ return peering_state.is_backfilling();
+ }
+ uint64_t get_last_user_version() const {
+ return get_info().last_user_version;
}
bool get_need_up_thru() const {
return peering_state.get_need_up_thru();
}
+ epoch_t get_same_interval_since() const {
+ return get_info().history.same_interval_since;
+ }
const auto& get_pool() const {
return peering_state.get_pool();
}
+ pg_shard_t get_primary() const {
+ return peering_state.get_primary();
+ }
/// initialize created PG
void init(
int acting_primary,
const pg_history_t& history,
const PastIntervals& pim,
- bool backfill,
ceph::os::Transaction &t);
seastar::future<> read_state(crimson::os::FuturizedStore* store);
void handle_activate_map(PeeringCtx &rctx);
void handle_initialize(PeeringCtx &rctx);
- static std::pair<hobject_t, RWState::State> get_oid_and_lock(
- const MOSDOp &m,
- const OpInfo &op_info);
+ static hobject_t get_oid(const hobject_t& hobj);
+ static RWState::State get_lock_type(const OpInfo &op_info);
static std::optional<hobject_t> resolve_oid(
const SnapSet &snapset,
const hobject_t &oid);
using load_obc_ertr = crimson::errorator<
crimson::ct_error::object_corrupted>;
- load_obc_ertr::future<
+ using load_obc_iertr =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ load_obc_ertr>;
+ using interruptor = ::crimson::interruptible::interruptor<
+ ::crimson::osd::IOInterruptCondition>;
+ load_obc_iertr::future<
std::pair<crimson::osd::ObjectContextRef, bool>>
get_or_load_clone_obc(
hobject_t oid, crimson::osd::ObjectContextRef head_obc);
- load_obc_ertr::future<
+ load_obc_iertr::future<
std::pair<crimson::osd::ObjectContextRef, bool>>
get_or_load_head_obc(hobject_t oid);
- load_obc_ertr::future<ObjectContextRef> get_locked_obc(
- Operation *op,
- const hobject_t &oid,
- RWState::State type);
+ load_obc_iertr::future<crimson::osd::ObjectContextRef>
+ load_head_obc(ObjectContextRef obc);
+
+ load_obc_iertr::future<>
+ reload_obc(crimson::osd::ObjectContext& obc) const;
+
public:
- template <typename F>
- auto with_locked_obc(
- Ref<MOSDOp> &m,
+ using with_obc_func_t =
+ std::function<load_obc_iertr::future<> (ObjectContextRef)>;
+
+ using obc_accessing_list_t = boost::intrusive::list<
+ ObjectContext,
+ ObjectContext::obc_accessing_option_t>;
+ obc_accessing_list_t obc_set_accessing;
+
+ template<RWState::State State>
+ load_obc_iertr::future<> with_head_obc(hobject_t oid, with_obc_func_t&& func);
+
+ template<RWState::State State>
+ interruptible_future<> with_locked_obc(
+ ObjectContextRef obc,
+ with_obc_func_t&& f);
+ load_obc_iertr::future<> with_locked_obc(
+ const hobject_t &hobj,
const OpInfo &op_info,
- Operation *op,
- F &&f) {
- auto [oid, type] = get_oid_and_lock(*m, op_info);
- return get_locked_obc(op, oid, type)
- .safe_then([this, f=std::forward<F>(f), type=type](auto obc) {
- return f(obc).finally([this, obc, type=type] {
- obc->put_lock_type(type);
- return load_obc_ertr::now();
- });
- });
- }
-
- seastar::future<> handle_rep_op(Ref<MOSDRepOp> m);
- void handle_rep_op_reply(crimson::net::Connection* conn,
+ with_obc_func_t&& f);
+
+ interruptible_future<> handle_rep_op(Ref<MOSDRepOp> m);
+ void handle_rep_op_reply(crimson::net::ConnectionRef conn,
const MOSDRepOpReply& m);
void print(std::ostream& os) const;
+ void dump_primary(Formatter*);
private:
- void do_peering_event(
- const boost::statechart::event_base &evt,
- PeeringCtx &rctx);
- seastar::future<Ref<MOSDOpReply>> do_osd_ops(
+ template<RWState::State State>
+ load_obc_iertr::future<> with_head_obc(
+ ObjectContextRef obc,
+ bool existed,
+ with_obc_func_t&& func);
+ template<RWState::State State>
+ interruptible_future<> with_existing_head_obc(
+ ObjectContextRef head,
+ with_obc_func_t&& func);
+
+ template<RWState::State State>
+ load_obc_iertr::future<> with_clone_obc(hobject_t oid, with_obc_func_t&& func);
+ template<RWState::State State>
+ interruptible_future<> with_existing_clone_obc(
+ ObjectContextRef clone, with_obc_func_t&& func);
+
+ load_obc_iertr::future<ObjectContextRef> get_locked_obc(
+ Operation *op,
+ const hobject_t &oid,
+ RWState::State type);
+
+ void fill_op_params_bump_pg_version(
+ osd_op_params_t& osd_op_p,
+ const bool user_modify);
+ using do_osd_ops_ertr = crimson::errorator<
+ crimson::ct_error::eagain>;
+ using do_osd_ops_iertr =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ ::crimson::errorator<crimson::ct_error::eagain>>;
+ template <typename Ret = void>
+ using pg_rep_op_fut_t =
+ std::tuple<interruptible_future<>,
+ do_osd_ops_iertr::future<Ret>>;
+ do_osd_ops_iertr::future<pg_rep_op_fut_t<MURef<MOSDOpReply>>> do_osd_ops(
Ref<MOSDOp> m,
- ObjectContextRef obc);
- seastar::future<Ref<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
- seastar::future<> do_osd_op(
- ObjectState& os,
- OSDOp& op,
- ceph::os::Transaction& txn);
- seastar::future<ceph::bufferlist> do_pgnls(ceph::bufferlist& indata,
- const std::string& nspace,
- uint64_t limit);
- seastar::future<> submit_transaction(ObjectContextRef&& obc,
- ceph::os::Transaction&& txn,
- const MOSDOp& req);
+ ObjectContextRef obc,
+ const OpInfo &op_info);
+ using do_osd_ops_success_func_t =
+ std::function<do_osd_ops_iertr::future<>()>;
+ using do_osd_ops_failure_func_t =
+ std::function<do_osd_ops_iertr::future<>(const std::error_code&)>;
+ struct do_osd_ops_params_t;
+ do_osd_ops_iertr::future<pg_rep_op_fut_t<>> do_osd_ops(
+ ObjectContextRef obc,
+ std::vector<OSDOp>& ops,
+ const OpInfo &op_info,
+ const do_osd_ops_params_t& params,
+ do_osd_ops_success_func_t success_func,
+ do_osd_ops_failure_func_t failure_func);
+ template <class Ret, class SuccessFunc, class FailureFunc>
+ do_osd_ops_iertr::future<pg_rep_op_fut_t<Ret>> do_osd_ops_execute(
+ seastar::lw_shared_ptr<OpsExecuter> ox,
+ std::vector<OSDOp>& ops,
+ const OpInfo &op_info,
+ SuccessFunc&& success_func,
+ FailureFunc&& failure_func);
+ interruptible_future<MURef<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
+ std::tuple<interruptible_future<>, interruptible_future<>>
+ submit_transaction(
+ const OpInfo& op_info,
+ const std::vector<OSDOp>& ops,
+ ObjectContextRef&& obc,
+ ceph::os::Transaction&& txn,
+ osd_op_params_t&& oop);
+ interruptible_future<> repair_object(
+ const hobject_t& oid,
+ eversion_t& v);
private:
OSDMapGate osdmap_gate;
public:
cached_map_t get_osdmap() { return osdmap; }
-
+ eversion_t next_version() {
+ return eversion_t(get_osdmap_epoch(),
+ ++projected_last_update.version);
+ }
+ ShardServices& get_shard_services() final {
+ return shard_services;
+ }
+ seastar::future<> stop();
private:
std::unique_ptr<PGBackend> backend;
+ std::unique_ptr<RecoveryBackend> recovery_backend;
+ std::unique_ptr<PGRecovery> recovery_handler;
PeeringState peering_state;
eversion_t projected_last_update;
+public:
+ // PeeringListener
+ void publish_stats_to_osd() final;
+ void clear_publish_stats() final;
+ pg_stat_t get_stats() const;
+private:
+ std::optional<pg_stat_t> pg_stats;
+
+public:
+ RecoveryBackend* get_recovery_backend() final {
+ return recovery_backend.get();
+ }
+ PGRecovery* get_recovery_handler() final {
+ return recovery_handler.get();
+ }
+ PeeringState& get_peering_state() final {
+ return peering_state;
+ }
+ bool has_reset_since(epoch_t epoch) const final {
+ return peering_state.pg_has_reset_since(epoch);
+ }
+
+ const pg_missing_tracker_t& get_local_missing() const {
+ return peering_state.get_pg_log().get_missing();
+ }
+ epoch_t get_last_peering_reset() const final {
+ return peering_state.get_last_peering_reset();
+ }
+ const std::set<pg_shard_t> &get_acting_recovery_backfill() const {
+ return peering_state.get_acting_recovery_backfill();
+ }
+ bool is_backfill_target(pg_shard_t osd) const {
+ return peering_state.is_backfill_target(osd);
+ }
+ void begin_peer_recover(pg_shard_t peer, const hobject_t oid) {
+ peering_state.begin_peer_recover(peer, oid);
+ }
+ uint64_t min_peer_features() const {
+ return peering_state.get_min_peer_features();
+ }
+ const std::map<hobject_t, std::set<pg_shard_t>>&
+ get_missing_loc_shards() const {
+ return peering_state.get_missing_loc().get_missing_locs();
+ }
+ const std::map<pg_shard_t, pg_missing_t> &get_shard_missing() const {
+ return peering_state.get_peer_missing();
+ }
+ epoch_t get_interval_start_epoch() const {
+ return get_info().history.same_interval_since;
+ }
+ const pg_missing_const_i* get_shard_missing(pg_shard_t shard) const {
+ if (shard == pg_whoami)
+ return &get_local_missing();
+ else {
+ auto it = peering_state.get_peer_missing().find(shard);
+ if (it == peering_state.get_peer_missing().end())
+ return nullptr;
+ else
+ return &it->second;
+ }
+ }
+ interruptible_future<std::tuple<bool, int>> already_complete(const osd_reqid_t& reqid);
+ int get_recovery_op_priority() const {
+ int64_t pri = 0;
+ get_pool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
+ return pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority;
+ }
+ seastar::future<> mark_unfound_lost(int) {
+ // TODO: see PrimaryLogPG::mark_all_unfound_lost()
+ return seastar::now();
+ }
+
+ bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch) const;
+
+ template <typename MsgType>
+ bool can_discard_replica_op(const MsgType& m) const {
+ return can_discard_replica_op(m, m.map_epoch);
+ }
+
+private:
+ // instead of seastar::gate, we use a boolean flag to indicate
+ // whether the system is shutting down, as we don't need to track
+ // continuations here.
+ bool stopping = false;
+
class WaitForActiveBlocker : public BlockerT<WaitForActiveBlocker> {
PG *pg;
WaitForActiveBlocker(PG *pg) : pg(pg) {}
void on_active();
blocking_future<> wait();
+ seastar::future<> stop();
} wait_for_active_blocker;
friend std::ostream& operator<<(std::ostream&, const PG& pg);
friend class ClientRequest;
+ friend struct CommonClientRequest;
friend class PGAdvanceMap;
friend class PeeringEvent;
friend class RepRequest;
+ friend class BackfillRecovery;
+ friend struct PGFacade;
+ friend class InternalClientRequest;
+ friend class WatchTimeoutRequest;
+private:
+ seastar::future<bool> find_unfound() {
+ return seastar::make_ready_future<bool>(true);
+ }
+
+ bool can_discard_replica_op(const Message& m, epoch_t m_map_epoch) const;
+ bool can_discard_op(const MOSDOp& m) const;
+ bool is_missing_object(const hobject_t& soid) const {
+ return peering_state.get_pg_log().get_missing().get_items().count(soid);
+ }
+ bool is_unreadable_object(const hobject_t &oid,
+ eversion_t* v = 0) const final {
+ return is_missing_object(oid) ||
+ !peering_state.get_missing_loc().readable_with_acting(
+ oid, get_actingset(), v);
+ }
+ bool is_degraded_or_backfilling_object(const hobject_t& soid) const;
+ const std::set<pg_shard_t> &get_actingset() const {
+ return peering_state.get_actingset();
+ }
+
+private:
+ BackfillRecovery::BackfillRecoveryPipeline backfill_pipeline;
+
+ friend class IOInterruptCondition;
+};
+
+struct PG::do_osd_ops_params_t {
+ crimson::net::ConnectionRef get_connection() const {
+ return nullptr;
+ }
+ osd_reqid_t get_reqid() const {
+ return reqid;
+ }
+ utime_t get_mtime() const {
+ return mtime;
+ };
+ epoch_t get_map_epoch() const {
+ return map_epoch;
+ }
+ entity_inst_t get_orig_source_inst() const {
+ return orig_source_inst;
+ }
+ uint64_t get_features() const {
+ return features;
+ }
+ crimson::net::ConnectionRef conn;
+ osd_reqid_t reqid;
+ utime_t mtime;
+ epoch_t map_epoch;
+ entity_inst_t orig_source_inst;
+ uint64_t features;
};
std::ostream& operator<<(std::ostream&, const PG& pg);