*/
#pragma once
+#include <boost/intrusive/slist.hpp>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/circular_buffer.hh>
-#include <seastar/util/noncopyable_function.hh>
+#include <functional>
+#include <atomic>
#include <queue>
#include <chrono>
#include <unordered_set>
+#include <optional>
+
+namespace bi = boost::intrusive;
namespace seastar {
+class fair_group_rover;
+
/// \brief describes a request that passes through the \ref fair_queue.
///
/// A ticket is specified by a \c weight and a \c size. For example, one can specify a request of \c weight
class fair_queue_ticket {
uint32_t _weight = 0; ///< the total weight of these requests for capacity purposes (IOPS).
uint32_t _size = 0; ///< the total effective size of these requests
+ friend class fair_group_rover;
public:
/// Constructs a fair_queue_ticket with a given \c weight and a given \c size
///
/// \param weight the weight of the request
/// \param size the size of the request
- fair_queue_ticket(uint32_t weight, uint32_t size);
- fair_queue_ticket() {}
- fair_queue_ticket operator+(fair_queue_ticket desc) const;
- fair_queue_ticket operator-(fair_queue_ticket desc) const;
+ fair_queue_ticket(uint32_t weight, uint32_t size) noexcept;
+ fair_queue_ticket() noexcept {}
+ fair_queue_ticket operator+(fair_queue_ticket desc) const noexcept;
+ fair_queue_ticket operator-(fair_queue_ticket desc) const noexcept;
/// Increase the quantity represented in this ticket by the amount represented by \c desc
/// \param desc another \ref fair_queue_ticket whose \c weight \c and size will be added to this one
- fair_queue_ticket& operator+=(fair_queue_ticket desc);
+ fair_queue_ticket& operator+=(fair_queue_ticket desc) noexcept;
/// Decreases the quantity represented in this ticket by the amount represented by \c desc
/// \param desc another \ref fair_queue_ticket whose \c weight \c and size will be decremented from this one
- fair_queue_ticket& operator-=(fair_queue_ticket desc);
+ fair_queue_ticket& operator-=(fair_queue_ticket desc) noexcept;
+ /// Checks if the tickets fully equals to another one
+ /// \param desc another \ref fair_queue_ticket to compare with
+ bool operator==(const fair_queue_ticket& desc) const noexcept;
- /// \returns true if this fair_queue_ticket is strictly less than \c rhs.
- ///
- /// For a fair_queue_ticket to be considered strictly less than another, both its quantities need to be
- /// less than the other. Note that there is no total ordering between two fair_queue_tickets
- //
- /// \param rhs another \ref fair_queue_ticket to be compared to this one.
- bool strictly_less(fair_queue_ticket rhs) const;
+ std::chrono::microseconds duration_at_pace(float weight_pace, float size_pace) const noexcept;
/// \returns true if the fair_queue_ticket represents a non-zero quantity.
///
/// For a fair_queue ticket to be non-zero, at least one of its represented quantities need to
/// be non-zero
- explicit operator bool() const;
+ explicit operator bool() const noexcept;
friend std::ostream& operator<<(std::ostream& os, fair_queue_ticket t);
///
/// It is however not legal for the axis to have any quantity set to zero.
/// \param axis another \ref fair_queue_ticket to be used as a a base vector against which to normalize this fair_queue_ticket.
- float normalize(fair_queue_ticket axis) const;
+ float normalize(fair_queue_ticket axis) const noexcept;
+};
+
+class fair_group_rover {
+ uint32_t _weight = 0;
+ uint32_t _size = 0;
+
+public:
+ fair_group_rover(uint32_t weight, uint32_t size) noexcept;
+
+ /*
+ * For both dimentions checks if the current rover is ahead of the
+ * other and returns the difference. If this is behind returns zero.
+ */
+ fair_queue_ticket maybe_ahead_of(const fair_group_rover& other) const noexcept;
+ fair_group_rover operator+(fair_queue_ticket t) const noexcept;
+ fair_group_rover& operator+=(fair_queue_ticket t) noexcept;
+
+ friend std::ostream& operator<<(std::ostream& os, fair_group_rover r);
};
/// \addtogroup io-module
/// @{
-/// \cond internal
-class priority_class {
- struct request {
- noncopyable_function<void()> func;
- fair_queue_ticket desc;
- };
+class fair_queue_entry {
friend class fair_queue;
- uint32_t _shares = 0;
- float _accumulated = 0;
- circular_buffer<request> _queue;
- bool _queued = false;
- friend struct shared_ptr_no_esft<priority_class>;
- explicit priority_class(uint32_t shares) noexcept : _shares(std::max(shares, 1u)) {}
+ fair_queue_ticket _ticket;
+ bi::slist_member_hook<> _hook;
public:
- /// \brief return the current amount of shares for this priority class
- uint32_t shares() const noexcept {
- return _shares;
- }
-
- void update_shares(uint32_t shares) noexcept {
- _shares = (std::max(shares, 1u));
- }
+ fair_queue_entry(fair_queue_ticket t) noexcept
+ : _ticket(std::move(t)) {}
+ using container_list_t = bi::slist<fair_queue_entry,
+ bi::constant_time_size<false>,
+ bi::cache_last<true>,
+ bi::member_hook<fair_queue_entry, bi::slist_member_hook<>, &fair_queue_entry::_hook>>;
+
+ fair_queue_ticket ticket() const noexcept { return _ticket; }
};
-/// \endcond
-/// \brief Priority class, to be used with a given \ref fair_queue
+/// \brief Group of queues class
///
-/// An instance of this class is associated with a given \ref fair_queue. When registering
-/// a class, the caller will receive a \ref lw_shared_ptr to an object of this class. All its methods
-/// are private, so the only thing the caller is expected to do with it is to pass it later
-/// to the \ref fair_queue to identify a given class.
+/// This is a fair group. It's attached by one or mode fair queues. On machines having the
+/// big* amount of shards, queues use the group to borrow/lend the needed capacity for
+/// requests dispatching.
///
-/// \related fair_queue
-using priority_class_ptr = lw_shared_ptr<priority_class>;
+/// * Big means that when all shards sumbit requests alltogether the disk is unable to
+/// dispatch them efficiently. The inability can be of two kinds -- either disk cannot
+/// cope with the number of arriving requests, or the total size of the data withing
+/// the given time frame exceeds the disk throughput.
+class fair_group {
+ using fair_group_atomic_rover = std::atomic<fair_group_rover>;
+ static_assert(fair_group_atomic_rover::is_always_lock_free);
+
+ fair_group_atomic_rover _capacity_tail;
+ fair_group_atomic_rover _capacity_head;
+ fair_queue_ticket _maximum_capacity;
+
+public:
+ struct config {
+ unsigned max_req_count;
+ unsigned max_bytes_count;
+
+ /// Constructs a config with the given \c capacity, expressed in maximum
+ /// values for requests and bytes.
+ ///
+ /// \param max_requests how many concurrent requests are allowed in this queue.
+ /// \param max_bytes how many total bytes are allowed in this queue.
+ config(unsigned max_requests, unsigned max_bytes) noexcept
+ : max_req_count(max_requests), max_bytes_count(max_bytes) {}
+ };
+ explicit fair_group(config cfg) noexcept;
+ fair_group(fair_group&&) = delete;
+
+ fair_queue_ticket maximum_capacity() const noexcept { return _maximum_capacity; }
+ fair_group_rover grab_capacity(fair_queue_ticket cap) noexcept;
+ void release_capacity(fair_queue_ticket cap) noexcept;
+
+ fair_group_rover head() const noexcept {
+ return _capacity_head.load(std::memory_order_relaxed);
+ }
+};
/// \brief Fair queuing class
///
/// 1 share. Higher weights for a request will consume a proportionally higher amount of
/// shares.
///
-/// The user of this interface is expected to register multiple `priority_class`
+/// The user of this interface is expected to register multiple `priority_class_data`
/// objects, which will each have a shares attribute.
///
/// Internally, each priority class may keep a separate queue of requests.
/// \sets the operation parameters of a \ref fair_queue
/// \related fair_queue
struct config {
- std::chrono::microseconds tau = std::chrono::milliseconds(100);
- unsigned max_req_count = std::numeric_limits<unsigned>::max();
- unsigned max_bytes_count = std::numeric_limits<unsigned>::max();
+ std::chrono::microseconds tau = std::chrono::milliseconds(5);
+ // Time (in microseconds) is takes to process one ticket value
+ float ticket_size_pace;
+ float ticket_weight_pace;
};
-private:
- friend priority_class;
+ using class_id = unsigned int;
+ class priority_class_data;
+
+private:
+ using priority_class_ptr = priority_class_data*;
struct class_compare {
- bool operator() (const priority_class_ptr& lhs, const priority_class_ptr& rhs) const {
- return lhs->_accumulated > rhs->_accumulated;
- }
+ bool operator() (const priority_class_ptr& lhs, const priority_class_ptr & rhs) const noexcept;
};
config _config;
- fair_queue_ticket _maximum_capacity;
- fair_queue_ticket _current_capacity;
+ fair_group& _group;
fair_queue_ticket _resources_executing;
fair_queue_ticket _resources_queued;
unsigned _requests_executing = 0;
clock_type _base;
using prioq = std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare>;
prioq _handles;
- std::unordered_set<priority_class_ptr> _all_classes;
-
- void push_priority_class(priority_class_ptr pc);
+ std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
+
+ /*
+ * When the shared capacity os over the local queue delays
+ * further dispatching untill better times
+ *
+ * \orig_tail -- the value group tail rover had when it happened
+ * \cap -- the capacity that's accounted on the group
+ *
+ * The last field is needed to "rearm" the wait in case
+ * queue decides that it wants to dispatch another capacity
+ * in the middle of the waiting
+ */
+ struct pending {
+ fair_group_rover orig_tail;
+ fair_queue_ticket cap;
+
+ pending(fair_group_rover t, fair_queue_ticket c) noexcept : orig_tail(t), cap(c) {}
+ };
- priority_class_ptr pop_priority_class();
+ std::optional<pending> _pending;
- float normalize_factor() const;
+ void push_priority_class(priority_class_data& pc);
+ void pop_priority_class(priority_class_data& pc);
void normalize_stats();
- bool can_dispatch() const;
+ // Estimated time to process the given ticket
+ std::chrono::microseconds duration(fair_queue_ticket desc) const noexcept {
+ return desc.duration_at_pace(_config.ticket_weight_pace, _config.ticket_size_pace);
+ }
+
+ bool grab_capacity(fair_queue_ticket cap) noexcept;
+ bool grab_pending_capacity(fair_queue_ticket cap) noexcept;
public:
/// Constructs a fair queue with configuration parameters \c cfg.
///
/// \param cfg an instance of the class \ref config
- explicit fair_queue(config cfg);
-
- /// Constructs a fair queue with a given \c capacity, expressed in IOPS.
- ///
- /// \param capacity how many concurrent requests are allowed in this queue.
- /// \param tau the queue exponential decay parameter, as in exp(-1/tau * t)
- explicit fair_queue(unsigned capacity, std::chrono::microseconds tau = std::chrono::milliseconds(100))
- : fair_queue(config{tau, capacity}) {}
+ explicit fair_queue(fair_group& shared, config cfg);
+ fair_queue(fair_queue&&);
+ ~fair_queue();
/// Registers a priority class against this fair queue.
///
/// \param shares how many shares to create this class with
- priority_class_ptr register_priority_class(uint32_t shares);
+ void register_priority_class(class_id c, uint32_t shares);
/// Unregister a priority class.
///
/// It is illegal to unregister a priority class that still have pending requests.
- void unregister_priority_class(priority_class_ptr pclass);
+ void unregister_priority_class(class_id c);
+
+ void update_shares_for_class(class_id c, uint32_t new_shares);
/// \return how many waiters are currently queued for all classes.
[[deprecated("fair_queue users should not track individual requests, but resources (weight, size) passing through the queue")]]
/// \return the amount of resources (weight, size) currently executing
fair_queue_ticket resources_currently_executing() const;
- /// Queue the function \c func through this class' \ref fair_queue, with weight \c weight
- ///
- /// It is expected that \c func doesn't throw. If it does throw, it will be just removed from
- /// the queue and discarded.
+ /// Queue the entry \c ent through this class' \ref fair_queue
///
/// The user of this interface is supposed to call \ref notify_requests_finished when the
/// request finishes executing - regardless of success or failure.
- void queue(priority_class_ptr pc, fair_queue_ticket desc, noncopyable_function<void()> func);
+ void queue(class_id c, fair_queue_entry& ent);
/// Notifies that ont request finished
/// \param desc an instance of \c fair_queue_ticket structure describing the request that just finished.
- void notify_requests_finished(fair_queue_ticket desc, unsigned nr = 1) noexcept;
+ void notify_request_finished(fair_queue_ticket desc) noexcept;
+ void notify_request_cancelled(fair_queue_entry& ent) noexcept;
/// Try to execute new requests if there is capacity left in the queue.
- void dispatch_requests();
+ void dispatch_requests(std::function<void(fair_queue_entry&)> cb);
+
+ clock_type next_pending_aio() const noexcept {
+ if (_pending) {
+ /*
+ * We expect the disk to release the ticket within some time,
+ * but it's ... OK if it doesn't -- the pending wait still
+ * needs the head rover value to be ahead of the needed value.
+ *
+ * It may happen that the capacity gets released before we think
+ * it will, in this case we will wait for the full value again,
+ * which's sub-optimal. The expectation is that we think disk
+ * works faster, than it really does.
+ */
+ fair_group_rover pending_head = _pending->orig_tail + _pending->cap;
+ fair_queue_ticket over = pending_head.maybe_ahead_of(_group.head());
+ return std::chrono::steady_clock::now() + duration(over);
+ }
+
+ return std::chrono::steady_clock::time_point::max();
+ }
};
/// @}