#include "common/EventTrace.h"
#include "osd/osd_perf_counters.h"
#include "common/Finisher.h"
+#include "scrubber/osd_scrub_sched.h"
#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
class LogChannel;
class MOSDPGCreate2;
-class MOSDPGQuery;
class MOSDPGNotify;
class MOSDPGInfo;
class MOSDPGRemove;
CephContext *cct;
ObjectStore::CollectionHandle meta_ch;
const int whoami;
- ObjectStore *&store;
+ ObjectStore * const store;
LogClient &log_client;
LogChannelRef clog;
PGRecoveryStats &pg_recovery_stats;
}
entity_name_t get_cluster_msgr_name() const;
-private:
- // -- scrub scheduling --
- ceph::mutex sched_scrub_lock = ceph::make_mutex("OSDService::sched_scrub_lock");
- int scrubs_local;
- int scrubs_remote;
public:
- struct ScrubJob {
- CephContext* cct;
- /// pg to be scrubbed
- spg_t pgid;
- /// a time scheduled for scrub. but the scrub could be delayed if system
- /// load is too high or it fails to fall in the scrub hours
- utime_t sched_time;
- /// the hard upper bound of scrub time
- utime_t deadline;
- ScrubJob() : cct(nullptr) {}
- explicit ScrubJob(CephContext* cct, const spg_t& pg,
- const utime_t& timestamp,
- double pool_scrub_min_interval = 0,
- double pool_scrub_max_interval = 0, bool must = true);
- /// order the jobs by sched_time
- bool operator<(const ScrubJob& rhs) const;
- };
- std::set<ScrubJob> sched_scrub_pg;
-
- /// @returns the scrub_reg_stamp used for unregistering the scrub job
- utime_t reg_pg_scrub(spg_t pgid,
- utime_t t,
- double pool_scrub_min_interval,
- double pool_scrub_max_interval,
- bool must) {
- ScrubJob scrub_job(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
- must);
- std::lock_guard l(OSDService::sched_scrub_lock);
- sched_scrub_pg.insert(scrub_job);
- return scrub_job.sched_time;
- }
-
- void unreg_pg_scrub(spg_t pgid, utime_t t) {
- std::lock_guard l(sched_scrub_lock);
- size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
- ceph_assert(removed);
- }
-
- bool first_scrub_stamp(ScrubJob *out) {
- std::lock_guard l(sched_scrub_lock);
- if (sched_scrub_pg.empty())
- return false;
- std::set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
- *out = *iter;
- return true;
- }
- bool next_scrub_stamp(const ScrubJob& next,
- ScrubJob *out) {
- std::lock_guard l(sched_scrub_lock);
- if (sched_scrub_pg.empty())
- return false;
- std::set<ScrubJob>::const_iterator iter = sched_scrub_pg.upper_bound(next);
- if (iter == sched_scrub_pg.cend())
- return false;
- *out = *iter;
- return true;
- }
-
- void dumps_scrub(ceph::Formatter* f);
-
- bool can_inc_scrubs();
- bool inc_scrubs_local();
- void dec_scrubs_local();
- bool inc_scrubs_remote();
- void dec_scrubs_remote();
- void dump_scrub_reservations(ceph::Formatter *f);
void reply_op_error(OpRequestRef op, int err);
void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv,
std::vector<pg_log_op_return_item_t> op_returns);
void handle_misdirected_op(PG *pg, OpRequestRef op);
+ private:
+ /**
+ * The entity that maintains the set of PGs we may scrub (i.e. - those that we
+ * are their primary), and schedules their scrubbing.
+ */
+ ScrubQueue m_scrub_queue;
-private:
+ public:
+ ScrubQueue& get_scrub_services() { return m_scrub_queue; }
+
+ /**
+ * A callback used by the ScrubQueue object to initiate a scrub on a specific PG.
+ *
+ * The request might fail for multiple reasons, as ScrubQueue cannot by its own
+ * check some of the PG-specific preconditions and those are checked here. See
+ * attempt_t definition.
+ *
+ * @param pgid to scrub
+ * @param allow_requested_repair_only
+ * @return a Scrub::attempt_t detailing either a success, or the failure reason.
+ */
+ Scrub::schedule_result_t initiate_a_scrub(spg_t pgid, bool allow_requested_repair_only);
+
+
+ private:
// -- agent shared state --
ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock");
ceph::condition_variable agent_cond;
void queue_recovery_context(PG *pg, GenContext<ThreadPool::TPHandle&> *c);
void queue_for_snap_trim(PG *pg);
void queue_for_scrub(PG* pg, Scrub::scrub_prio_t with_priority);
+
void queue_scrub_after_repair(PG* pg, Scrub::scrub_prio_t with_priority);
- /// queue the message (-> event) that all replicas reserved scrub resources for us
+ /// queue the message (-> event) that all replicas have reserved scrub resources for us
void queue_for_scrub_granted(PG* pg, Scrub::scrub_prio_t with_priority);
/// queue the message (-> event) that some replicas denied our scrub resources request
/// Signals that all pending updates were applied
void queue_scrub_applied_update(PG* pg, Scrub::scrub_prio_t with_priority);
+ /// Signals that the selected chunk (objects range) is available for scrubbing
+ void queue_scrub_chunk_free(PG* pg, Scrub::scrub_prio_t with_priority);
+
+ /// The chunk selected is blocked by user operations, and cannot be scrubbed now
+ void queue_scrub_chunk_busy(PG* pg, Scrub::scrub_prio_t with_priority);
+
/// The block-range that was locked and prevented the scrubbing - is freed
void queue_scrub_unblocking(PG* pg, Scrub::scrub_prio_t with_priority);
/// Signals that all write OPs are done
void queue_scrub_digest_update(PG* pg, Scrub::scrub_prio_t with_priority);
+ /// Signals that the the local (Primary's) scrub map is ready
+ void queue_scrub_got_local_map(PG* pg, Scrub::scrub_prio_t with_priority);
+
/// Signals that we (the Primary) got all waited-for scrub-maps from our replicas
void queue_scrub_got_repl_maps(PG* pg, Scrub::scrub_prio_t with_priority);
+ /// Signals that all chunks were handled
+ /// Note: always with high priority, as must be acted upon before the
+ /// next scrub request arrives from the Primary (and the primary is free
+ /// to send the request once the replica's map is received).
+ void queue_scrub_is_finished(PG* pg);
+
+ /// Signals that there are more chunks to handle
+ void queue_scrub_next_chunk(PG* pg, Scrub::scrub_prio_t with_priority);
+
+ /// Signals that we have finished comparing the maps for this chunk
+ /// Note: required, as in Crimson this operation is 'futurized'.
+ void queue_scrub_maps_compared(PG* pg, Scrub::scrub_prio_t with_priority);
+
void queue_for_rep_scrub(PG* pg,
Scrub::scrub_prio_t with_high_priority,
- unsigned int qu_priority);
+ unsigned int qu_priority,
+ Scrub::act_token_t act_token);
/// Signals a change in the number of in-flight recovery writes
void queue_scrub_replica_pushes(PG *pg, Scrub::scrub_prio_t with_priority);
+ /// (not in Crimson) Queue a SchedReplica event to be sent to the replica, to
+ /// trigger a re-check of the availability of the scrub map prepared by the
+ /// backend.
void queue_for_rep_scrub_resched(PG* pg,
Scrub::scrub_prio_t with_high_priority,
- unsigned int qu_priority);
+ unsigned int qu_priority,
+ Scrub::act_token_t act_token);
void queue_for_pg_delete(spg_t pgid, epoch_t e);
bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
template <class MSG_TYPE>
void queue_scrub_event_msg(PG* pg,
Scrub::scrub_prio_t with_priority,
- unsigned int qu_priority);
+ unsigned int qu_priority,
+ Scrub::act_token_t act_token);
/// An alternative version of queue_scrub_event_msg(), in which the queuing priority is
/// provided by the executing scrub (i.e. taken from PgScrubber::m_flags)
const OSDMapRef& osdmap,
unsigned *pushes_to_free);
- void _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot);
+ int _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot);
void identify_splits_and_merges(
const OSDMapRef& as_of_osdmap,
void register_and_wake_split_child(PG *pg);
void unprime_split_children(spg_t parent, unsigned old_pg_num);
void update_scheduler_config();
+ std::string get_scheduler_type();
OSDShard(
int id,
MgrClient mgrc;
PerfCounters *logger;
PerfCounters *recoverystate_perf;
- ObjectStore *store;
+ std::unique_ptr<ObjectStore> store;
#ifdef HAVE_LIBFUSE
FuseStore *fuse_store = nullptr;
#endif
// asok
friend class OSDSocketHook;
class OSDSocketHook *asok_hook;
+ using PGRefOrError = std::tuple<std::optional<PGRef>, int>;
+ PGRefOrError locate_asok_target(const cmdmap_t& cmdmap,
+ std::stringstream& ss, bool only_primary);
+ int asok_route_to_pg(bool only_primary,
+ std::string_view prefix,
+ cmdmap_t cmdmap,
+ Formatter *f,
+ std::stringstream& ss,
+ const bufferlist& inbl,
+ bufferlist& outbl,
+ std::function<void(int, const std::string&, bufferlist&)> on_finish);
void asok_command(
std::string_view prefix,
const cmdmap_t& cmdmap,
return service.get_tid();
}
- double scrub_sleep_time(bool must_scrub);
-
// -- generic pg peering --
- PeeringCtx create_context();
void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
ThreadPool::TPHandle *handle = NULL);
bool is_fast_dispatch);
void handle_fast_pg_create(MOSDPGCreate2 *m);
- void handle_fast_pg_query(MOSDPGQuery *m);
void handle_pg_query_nopg(const MQuery& q);
void handle_fast_pg_notify(MOSDPGNotify *m);
void handle_pg_notify_nopg(const MNotifyRec& q);
void sched_scrub();
void resched_all_scrubs();
bool scrub_random_backoff();
- bool scrub_load_below_threshold();
- bool scrub_time_permit(utime_t now);
// -- status reporting --
MPGStats *collect_pg_stats();
/* internal and external can point to the same messenger, they will still
* be cleaned up properly*/
OSD(CephContext *cct_,
- ObjectStore *store_,
+ std::unique_ptr<ObjectStore> store_,
int id,
Messenger *internal,
Messenger *external,
~OSD() override;
// static bits
- static int mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, std::string osdspec_affinity);
+ static int mkfs(CephContext *cct,
+ std::unique_ptr<ObjectStore> store,
+ uuid_d fsid,
+ int whoami,
+ std::string osdspec_affinity);
/* remove any non-user xattrs from a std::map of them */
void filter_xattrs(std::map<std::string, ceph::buffer::ptr>& attrs) {
double *elapsed,
std::ostream& ss);
int mon_cmd_set_config(const std::string &key, const std::string &val);
+ bool unsupported_objstore_for_qos();
void scrub_purged_snaps();
void probe_smart(const std::string& devid, std::ostream& ss);