#pragma once
+#include <functional>
#include <ostream>
#include <map>
#include <vector>
#include "osd/scheduler/OpScheduler.h"
#include "common/config.h"
-#include "include/cmp.h"
#include "common/ceph_context.h"
#include "common/mClockPriorityQueue.h"
#include "osd/scheduler/OpSchedulerItem.h"
namespace ceph::osd::scheduler {
-constexpr uint64_t default_min = 1;
-constexpr uint64_t default_max = 999999;
+constexpr double default_min = 0.0;
+constexpr double default_max = std::numeric_limits<double>::is_iec559 ?
+ std::numeric_limits<double>::infinity() :
+ std::numeric_limits<double>::max();
using client_id_t = uint64_t;
using profile_id_t = uint64_t;
client_id_t client_id;
profile_id_t profile_id;
+ auto operator<=>(const client_profile_id_t&) const = default;
friend std::ostream& operator<<(std::ostream& out,
const client_profile_id_t& client_profile) {
out << " client_id: " << client_profile.client_id
}
};
-WRITE_EQ_OPERATORS_2(client_profile_id_t, client_id, profile_id)
-WRITE_CMP_OPERATORS_2(client_profile_id_t, client_id, profile_id)
-
-
struct scheduler_id_t {
op_scheduler_class class_id;
client_profile_id_t client_profile_id;
+ auto operator<=>(const scheduler_id_t&) const = default;
friend std::ostream& operator<<(std::ostream& out,
const scheduler_id_t& sched_id) {
out << "{ class_id: " << sched_id.class_id
}
};
-WRITE_EQ_OPERATORS_2(scheduler_id_t, class_id, client_profile_id)
-WRITE_CMP_OPERATORS_2(scheduler_id_t, class_id, client_profile_id)
-
/**
* Scheduler implementation based on mclock.
*
const int whoami;
const uint32_t num_shards;
const int shard_id;
- bool is_rotational;
+ const bool is_rotational;
MonClient *monc;
- double max_osd_capacity;
- double osd_mclock_cost_per_io;
- double osd_mclock_cost_per_byte;
- std::string mclock_profile = "high_client_ops";
- struct ClientAllocs {
- uint64_t res;
- uint64_t wgt;
- uint64_t lim;
-
- ClientAllocs(uint64_t _res, uint64_t _wgt, uint64_t _lim) {
- update(_res, _wgt, _lim);
- }
- inline void update(uint64_t _res, uint64_t _wgt, uint64_t _lim) {
- res = _res;
- wgt = _wgt;
- lim = _lim;
- }
- };
- std::array<
- ClientAllocs,
- static_cast<size_t>(op_scheduler_class::client) + 1
- > client_allocs = {
- // Placeholder, get replaced with configured values
- ClientAllocs(1, 1, 1), // background_recovery
- ClientAllocs(1, 1, 1), // background_best_effort
- ClientAllocs(1, 1, 1), // immediate (not used)
- ClientAllocs(1, 1, 1) // client
- };
+ /**
+ * osd_bandwidth_cost_per_io
+ *
+ * mClock expects all queued items to have a uniform expression of
+ * "cost". However, IO devices generally have quite different capacity
+ * for sequential IO vs small random IO. This implementation handles this
+ * by expressing all costs as a number of sequential bytes written adding
+ * additional cost for each random IO equal to osd_bandwidth_cost_per_io.
+ *
+ * Thus, an IO operation requiring a total of <size> bytes to be written
+ * accross <iops> different locations will have a cost of
+ * <size> + (osd_bandwidth_cost_per_io * <iops>) bytes.
+ *
+ * Set in set_osd_capacity_params_from_config in the constructor and upon
+ * config change.
+ *
+ * Has units bytes/io.
+ */
+ double osd_bandwidth_cost_per_io;
+
+ /**
+ * osd_bandwidth_capacity_per_shard
+ *
+ * mClock expects reservation and limit paramters to be expressed in units
+ * of cost/second -- which means bytes/second for this implementation.
+ *
+ * Rather than expecting users to compute appropriate limit and reservation
+ * values for each class of OSDs in their cluster, we instead express
+ * reservation and limit paramaters as ratios of the OSD's maxmimum capacity.
+ * osd_bandwidth_capacity_per_shard is that capacity divided by the number
+ * of shards.
+ *
+ * Set in set_osd_capacity_params_from_config in the constructor and upon
+ * config change.
+ *
+ * This value gets passed to ClientRegistry::update_from_config in order
+ * to resolve the full reservaiton and limit parameters for mclock from
+ * the configured ratios.
+ *
+ * Has units bytes/second.
+ */
+ double osd_bandwidth_capacity_per_shard;
+
class ClientRegistry {
std::array<
crimson::dmclock::ClientInfo,
const crimson::dmclock::ClientInfo *get_external_client(
const client_profile_id_t &client) const;
public:
- void update_from_config(const ConfigProxy &conf);
+ /**
+ * update_from_config
+ *
+ * Sets the mclock paramaters (reservation, weight, and limit)
+ * for each class of IO (background_recovery, background_best_effort,
+ * and client).
+ */
+ void update_from_config(
+ const ConfigProxy &conf,
+ double capacity_per_shard);
const crimson::dmclock::ClientInfo *get_info(
const scheduler_id_t &id) const;
} client_registry;
true,
true,
2>;
+ using priority_t = unsigned;
+ using SubQueue = std::map<priority_t,
+ std::list<OpSchedulerItem>,
+ std::greater<priority_t>>;
mclock_queue_t scheduler;
- std::list<OpSchedulerItem> immediate;
+ /**
+ * high_priority
+ *
+ * Holds entries to be dequeued in strict order ahead of mClock
+ * Invariant: entries are never empty
+ */
+ SubQueue high_priority;
+ priority_t immediate_class_priority = std::numeric_limits<priority_t>::max();
static scheduler_id_t get_scheduler_id(const OpSchedulerItem &item) {
return scheduler_id_t{
};
}
-public:
- mClockScheduler(CephContext *cct, int whoami, uint32_t num_shards,
- int shard_id, bool is_rotational, MonClient *monc);
- ~mClockScheduler() override;
-
- // Set the max osd capacity in iops
- void set_max_osd_capacity();
-
- // Set the cost per io for the osd
- void set_osd_mclock_cost_per_io();
-
- // Set the cost per byte for the osd
- void set_osd_mclock_cost_per_byte();
-
- // Set the mclock profile type to enable
- void set_mclock_profile();
-
- // Get the active mclock profile
- std::string get_mclock_profile();
-
- // Set "balanced" profile allocations
- void set_balanced_profile_allocations();
-
- // Set "high_recovery_ops" profile allocations
- void set_high_recovery_ops_profile_allocations();
+ static unsigned int get_io_prio_cut(CephContext *cct) {
+ if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
+ std::random_device rd;
+ std::mt19937 random_gen(rd());
+ return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
+ } else if (cct->_conf->osd_op_queue_cut_off == "high") {
+ return CEPH_MSG_PRIO_HIGH;
+ } else {
+ // default / catch-all is 'low'
+ return CEPH_MSG_PRIO_LOW;
+ }
+ }
- // Set "high_client_ops" profile allocations
- void set_high_client_ops_profile_allocations();
+ unsigned cutoff_priority = get_io_prio_cut(cct);
+
+ /**
+ * set_osd_capacity_params_from_config
+ *
+ * mClockScheduler uses two parameters, osd_bandwidth_cost_per_io
+ * and osd_bandwidth_capacity_per_shard, internally. These two
+ * parameters are derived from config parameters
+ * osd_mclock_max_capacity_iops_(hdd|ssd) and
+ * osd_mclock_max_sequential_bandwidth_(hdd|ssd) as well as num_shards.
+ * Invoking set_osd_capacity_params_from_config() resets those derived
+ * params based on the current config and should be invoked any time they
+ * are modified as well as in the constructor. See handle_conf_change().
+ */
+ void set_osd_capacity_params_from_config();
// Set the mclock related config params based on the profile
- void enable_mclock_profile_settings();
+ void set_config_defaults_from_profile();
- // Set mclock config parameter based on allocations
- void set_profile_config();
+public:
+ mClockScheduler(CephContext *cct, int whoami, uint32_t num_shards,
+ int shard_id, bool is_rotational, MonClient *monc);
+ ~mClockScheduler() override;
- // Calculate scale cost per item
- int calc_scaled_cost(int cost);
+ /// Calculate scaled cost per item
+ uint32_t calc_scaled_cost(int cost);
// Helper method to display mclock queues
std::string display_queues() const;
// Enqueue op in the back of the regular queue
void enqueue(OpSchedulerItem &&item) final;
- // Enqueue the op in the front of the regular queue
+ // Enqueue the op in the front of the high priority queue
void enqueue_front(OpSchedulerItem &&item) final;
// Return an op to be dispatch
// Returns if the queue is empty
bool empty() const final {
- return immediate.empty() && scheduler.empty();
+ return scheduler.empty() && high_priority.empty();
}
// Formatted output of the queue
const char** get_tracked_conf_keys() const final;
void handle_conf_change(const ConfigProxy& conf,
const std::set<std::string> &changed) final;
+private:
+ // Enqueue the op to the high priority queue
+ void enqueue_high(unsigned prio, OpSchedulerItem &&item, bool front = false);
};
}