]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/include/seastar/core/fair_queue.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / include / seastar / core / fair_queue.hh
index 85e47b2aadb1650af106952a115e4317726809b3..8cf3d1a7a8d413665fc7ace32dbe87731871c3f9 100644 (file)
  */
 #pragma once
 
+#include <boost/intrusive/slist.hpp>
 #include <seastar/core/shared_ptr.hh>
 #include <seastar/core/circular_buffer.hh>
-#include <seastar/util/noncopyable_function.hh>
+#include <functional>
+#include <atomic>
 #include <queue>
 #include <chrono>
 #include <unordered_set>
+#include <optional>
+
+namespace bi = boost::intrusive;
 
 namespace seastar {
 
+class fair_group_rover;
+
 /// \brief describes a request that passes through the \ref fair_queue.
 ///
 /// A ticket is specified by a \c weight and a \c size. For example, one can specify a request of \c weight
@@ -40,35 +47,33 @@ namespace seastar {
 class fair_queue_ticket {
     uint32_t _weight = 0; ///< the total weight of these requests for capacity purposes (IOPS).
     uint32_t _size = 0;        ///< the total effective size of these requests
+    friend class fair_group_rover;
 public:
     /// Constructs a fair_queue_ticket with a given \c weight and a given \c size
     ///
     /// \param weight the weight of the request
     /// \param size the size of the request
-    fair_queue_ticket(uint32_t weight, uint32_t size);
-    fair_queue_ticket() {}
-    fair_queue_ticket operator+(fair_queue_ticket desc) const;
-    fair_queue_ticket operator-(fair_queue_ticket desc) const;
+    fair_queue_ticket(uint32_t weight, uint32_t size) noexcept;
+    fair_queue_ticket() noexcept {}
+    fair_queue_ticket operator+(fair_queue_ticket desc) const noexcept;
+    fair_queue_ticket operator-(fair_queue_ticket desc) const noexcept;
     /// Increase the quantity represented in this ticket by the amount represented by \c desc
     /// \param desc another \ref fair_queue_ticket whose \c weight \c and size will be added to this one
-    fair_queue_ticket& operator+=(fair_queue_ticket desc);
+    fair_queue_ticket& operator+=(fair_queue_ticket desc) noexcept;
     /// Decreases the quantity represented in this ticket by the amount represented by \c desc
     /// \param desc another \ref fair_queue_ticket whose \c weight \c and size will be decremented from this one
-    fair_queue_ticket& operator-=(fair_queue_ticket desc);
+    fair_queue_ticket& operator-=(fair_queue_ticket desc) noexcept;
+    /// Checks if the tickets fully equals to another one
+    /// \param desc another \ref fair_queue_ticket to compare with
+    bool operator==(const fair_queue_ticket& desc) const noexcept;
 
-    /// \returns true if this fair_queue_ticket is strictly less than \c rhs.
-    ///
-    /// For a fair_queue_ticket to be considered strictly less than another, both its quantities need to be
-    /// less than the other. Note that there is no total ordering between two fair_queue_tickets
-    //
-    /// \param rhs another \ref fair_queue_ticket to be compared to this one.
-    bool strictly_less(fair_queue_ticket rhs) const;
+    std::chrono::microseconds duration_at_pace(float weight_pace, float size_pace) const noexcept;
 
     /// \returns true if the fair_queue_ticket represents a non-zero quantity.
     ///
     /// For a fair_queue ticket to be non-zero, at least one of its represented quantities need to
     /// be non-zero
-    explicit operator bool() const;
+    explicit operator bool() const noexcept;
 
     friend std::ostream& operator<<(std::ostream& os, fair_queue_ticket t);
 
@@ -85,48 +90,89 @@ public:
     ///
     /// It is however not legal for the axis to have any quantity set to zero.
     /// \param axis another \ref fair_queue_ticket to be used as a a base vector against which to normalize this fair_queue_ticket.
-    float normalize(fair_queue_ticket axis) const;
+    float normalize(fair_queue_ticket axis) const noexcept;
+};
+
+class fair_group_rover {
+    uint32_t _weight = 0;
+    uint32_t _size = 0;
+
+public:
+    fair_group_rover(uint32_t weight, uint32_t size) noexcept;
+
+    /*
+     * For both dimentions checks if the current rover is ahead of the
+     * other and returns the difference. If this is behind returns zero.
+     */
+    fair_queue_ticket maybe_ahead_of(const fair_group_rover& other) const noexcept;
+    fair_group_rover operator+(fair_queue_ticket t) const noexcept;
+    fair_group_rover& operator+=(fair_queue_ticket t) noexcept;
+
+    friend std::ostream& operator<<(std::ostream& os, fair_group_rover r);
 };
 
 /// \addtogroup io-module
 /// @{
 
-/// \cond internal
-class priority_class {
-    struct request {
-        noncopyable_function<void()> func;
-        fair_queue_ticket desc;
-    };
+class fair_queue_entry {
     friend class fair_queue;
-    uint32_t _shares = 0;
-    float _accumulated = 0;
-    circular_buffer<request> _queue;
-    bool _queued = false;
 
-    friend struct shared_ptr_no_esft<priority_class>;
-    explicit priority_class(uint32_t shares) noexcept : _shares(std::max(shares, 1u)) {}
+    fair_queue_ticket _ticket;
+    bi::slist_member_hook<> _hook;
 
 public:
-    /// \brief return the current amount of shares for this priority class
-    uint32_t shares() const noexcept {
-        return _shares;
-    }
-
-    void update_shares(uint32_t shares) noexcept {
-        _shares = (std::max(shares, 1u));
-    }
+    fair_queue_entry(fair_queue_ticket t) noexcept
+        : _ticket(std::move(t)) {}
+    using container_list_t = bi::slist<fair_queue_entry,
+            bi::constant_time_size<false>,
+            bi::cache_last<true>,
+            bi::member_hook<fair_queue_entry, bi::slist_member_hook<>, &fair_queue_entry::_hook>>;
+
+    fair_queue_ticket ticket() const noexcept { return _ticket; }
 };
-/// \endcond
 
-/// \brief Priority class, to be used with a given \ref fair_queue
+/// \brief Group of queues class
 ///
-/// An instance of this class is associated with a given \ref fair_queue. When registering
-/// a class, the caller will receive a \ref lw_shared_ptr to an object of this class. All its methods
-/// are private, so the only thing the caller is expected to do with it is to pass it later
-/// to the \ref fair_queue to identify a given class.
+/// This is a fair group. It's attached by one or mode fair queues. On machines having the
+/// big* amount of shards, queues use the group to borrow/lend the needed capacity for
+/// requests dispatching.
 ///
-/// \related fair_queue
-using priority_class_ptr = lw_shared_ptr<priority_class>;
+/// * Big means that when all shards sumbit requests alltogether the disk is unable to
+/// dispatch them efficiently. The inability can be of two kinds -- either disk cannot
+/// cope with the number of arriving requests, or the total size of the data withing
+/// the given time frame exceeds the disk throughput.
+class fair_group {
+    using fair_group_atomic_rover = std::atomic<fair_group_rover>;
+    static_assert(fair_group_atomic_rover::is_always_lock_free);
+
+    fair_group_atomic_rover _capacity_tail;
+    fair_group_atomic_rover _capacity_head;
+    fair_queue_ticket _maximum_capacity;
+
+public:
+    struct config {
+        unsigned max_req_count;
+        unsigned max_bytes_count;
+
+        /// Constructs a config with the given \c capacity, expressed in maximum
+        /// values for requests and bytes.
+        ///
+        /// \param max_requests how many concurrent requests are allowed in this queue.
+        /// \param max_bytes how many total bytes are allowed in this queue.
+        config(unsigned max_requests, unsigned max_bytes) noexcept
+                : max_req_count(max_requests), max_bytes_count(max_bytes) {}
+    };
+    explicit fair_group(config cfg) noexcept;
+    fair_group(fair_group&&) = delete;
+
+    fair_queue_ticket maximum_capacity() const noexcept { return _maximum_capacity; }
+    fair_group_rover grab_capacity(fair_queue_ticket cap) noexcept;
+    void release_capacity(fair_queue_ticket cap) noexcept;
+
+    fair_group_rover head() const noexcept {
+        return _capacity_head.load(std::memory_order_relaxed);
+    }
+};
 
 /// \brief Fair queuing class
 ///
@@ -137,7 +183,7 @@ using priority_class_ptr = lw_shared_ptr<priority_class>;
 /// 1 share. Higher weights for a request will consume a proportionally higher amount of
 /// shares.
 ///
-/// The user of this interface is expected to register multiple `priority_class`
+/// The user of this interface is expected to register multiple `priority_class_data`
 /// objects, which will each have a shares attribute.
 ///
 /// Internally, each priority class may keep a separate queue of requests.
@@ -154,22 +200,23 @@ public:
     /// \sets the operation parameters of a \ref fair_queue
     /// \related fair_queue
     struct config {
-        std::chrono::microseconds tau = std::chrono::milliseconds(100);
-        unsigned max_req_count = std::numeric_limits<unsigned>::max();
-        unsigned max_bytes_count = std::numeric_limits<unsigned>::max();
+        std::chrono::microseconds tau = std::chrono::milliseconds(5);
+        // Time (in microseconds) is takes to process one ticket value
+        float ticket_size_pace;
+        float ticket_weight_pace;
     };
-private:
-    friend priority_class;
 
+    using class_id = unsigned int;
+    class priority_class_data;
+
+private:
+    using priority_class_ptr = priority_class_data*;
     struct class_compare {
-        bool operator() (const priority_class_ptr& lhs, const priority_class_ptr& rhs) const {
-            return lhs->_accumulated > rhs->_accumulated;
-        }
+        bool operator() (const priority_class_ptr& lhs, const priority_class_ptr & rhs) const noexcept;
     };
 
     config _config;
-    fair_queue_ticket _maximum_capacity;
-    fair_queue_ticket _current_capacity;
+    fair_group& _group;
     fair_queue_ticket _resources_executing;
     fair_queue_ticket _resources_queued;
     unsigned _requests_executing = 0;
@@ -178,39 +225,59 @@ private:
     clock_type _base;
     using prioq = std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare>;
     prioq _handles;
-    std::unordered_set<priority_class_ptr> _all_classes;
-
-    void push_priority_class(priority_class_ptr pc);
+    std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
+
+    /*
+     * When the shared capacity os over the local queue delays
+     * further dispatching untill better times
+     *
+     * \orig_tail  -- the value group tail rover had when it happened
+     * \cap   -- the capacity that's accounted on the group
+     *
+     * The last field is needed to "rearm" the wait in case
+     * queue decides that it wants to dispatch another capacity
+     * in the middle of the waiting
+     */
+    struct pending {
+        fair_group_rover orig_tail;
+        fair_queue_ticket cap;
+
+        pending(fair_group_rover t, fair_queue_ticket c) noexcept : orig_tail(t), cap(c) {}
+    };
 
-    priority_class_ptr pop_priority_class();
+    std::optional<pending> _pending;
 
-    float normalize_factor() const;
+    void push_priority_class(priority_class_data& pc);
+    void pop_priority_class(priority_class_data& pc);
 
     void normalize_stats();
 
-    bool can_dispatch() const;
+    // Estimated time to process the given ticket
+    std::chrono::microseconds duration(fair_queue_ticket desc) const noexcept {
+        return desc.duration_at_pace(_config.ticket_weight_pace, _config.ticket_size_pace);
+    }
+
+    bool grab_capacity(fair_queue_ticket cap) noexcept;
+    bool grab_pending_capacity(fair_queue_ticket cap) noexcept;
 public:
     /// Constructs a fair queue with configuration parameters \c cfg.
     ///
     /// \param cfg an instance of the class \ref config
-    explicit fair_queue(config cfg);
-
-    /// Constructs a fair queue with a given \c capacity, expressed in IOPS.
-    ///
-    /// \param capacity how many concurrent requests are allowed in this queue.
-    /// \param tau the queue exponential decay parameter, as in exp(-1/tau * t)
-    explicit fair_queue(unsigned capacity, std::chrono::microseconds tau = std::chrono::milliseconds(100))
-        : fair_queue(config{tau, capacity}) {}
+    explicit fair_queue(fair_group& shared, config cfg);
+    fair_queue(fair_queue&&);
+    ~fair_queue();
 
     /// Registers a priority class against this fair queue.
     ///
     /// \param shares how many shares to create this class with
-    priority_class_ptr register_priority_class(uint32_t shares);
+    void register_priority_class(class_id c, uint32_t shares);
 
     /// Unregister a priority class.
     ///
     /// It is illegal to unregister a priority class that still have pending requests.
-    void unregister_priority_class(priority_class_ptr pclass);
+    void unregister_priority_class(class_id c);
+
+    void update_shares_for_class(class_id c, uint32_t new_shares);
 
     /// \return how many waiters are currently queued for all classes.
     [[deprecated("fair_queue users should not track individual requests, but resources (weight, size) passing through the queue")]]
@@ -226,21 +293,39 @@ public:
     /// \return the amount of resources (weight, size) currently executing
     fair_queue_ticket resources_currently_executing() const;
 
-    /// Queue the function \c func through this class' \ref fair_queue, with weight \c weight
-    ///
-    /// It is expected that \c func doesn't throw. If it does throw, it will be just removed from
-    /// the queue and discarded.
+    /// Queue the entry \c ent through this class' \ref fair_queue
     ///
     /// The user of this interface is supposed to call \ref notify_requests_finished when the
     /// request finishes executing - regardless of success or failure.
-    void queue(priority_class_ptr pc, fair_queue_ticket desc, noncopyable_function<void()> func);
+    void queue(class_id c, fair_queue_entry& ent);
 
     /// Notifies that ont request finished
     /// \param desc an instance of \c fair_queue_ticket structure describing the request that just finished.
-    void notify_requests_finished(fair_queue_ticket desc, unsigned nr = 1) noexcept;
+    void notify_request_finished(fair_queue_ticket desc) noexcept;
+    void notify_request_cancelled(fair_queue_entry& ent) noexcept;
 
     /// Try to execute new requests if there is capacity left in the queue.
-    void dispatch_requests();
+    void dispatch_requests(std::function<void(fair_queue_entry&)> cb);
+
+    clock_type next_pending_aio() const noexcept {
+        if (_pending) {
+            /*
+             * We expect the disk to release the ticket within some time,
+             * but it's ... OK if it doesn't -- the pending wait still
+             * needs the head rover value to be ahead of the needed value.
+             *
+             * It may happen that the capacity gets released before we think
+             * it will, in this case we will wait for the full value again,
+             * which's sub-optimal. The expectation is that we think disk
+             * works faster, than it really does.
+             */
+            fair_group_rover pending_head = _pending->orig_tail + _pending->cap;
+            fair_queue_ticket over = pending_head.maybe_ahead_of(_group.head());
+            return std::chrono::steady_clock::now() + duration(over);
+        }
+
+        return std::chrono::steady_clock::time_point::max();
+    }
 };
 /// @}