]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/WeightedPriorityQueue.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / common / WeightedPriorityQueue.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef WP_QUEUE_H
16#define WP_QUEUE_H
17
18#include "OpQueue.h"
19
20#include <boost/intrusive/list.hpp>
21#include <boost/intrusive/rbtree.hpp>
22#include <boost/intrusive/avl_set.hpp>
23
11fdf7f2
TL
24#include "include/ceph_assert.h"
25
7c673cae
FG
26namespace bi = boost::intrusive;
27
28template <typename T, typename S>
29class MapKey
30{
31 public:
32 bool operator()(const S i, const T &k) const
33 {
34 return i < k.key;
35 }
36 bool operator()(const T &k, const S i) const
37 {
38 return k.key < i;
39 }
40};
41
42template <typename T>
43class DelItem
44{
45 public:
46 void operator()(T* delete_this)
47 { delete delete_this; }
48};
49
50template <typename T, typename K>
51class WeightedPriorityQueue : public OpQueue <T, K>
52{
53 private:
54 class ListPair : public bi::list_base_hook<>
55 {
56 public:
57 unsigned cost;
58 T item;
11fdf7f2 59 ListPair(unsigned c, T&& i) :
7c673cae 60 cost(c),
11fdf7f2
TL
61 item(std::move(i))
62 {}
7c673cae
FG
63 };
64 class Klass : public bi::set_base_hook<>
65 {
66 typedef bi::list<ListPair> ListPairs;
67 typedef typename ListPairs::iterator Lit;
68 public:
69 K key; // klass
70 ListPairs lp;
71 Klass(K& k) :
f64942e4
AA
72 key(k) {
73 }
74 ~Klass() {
75 lp.clear_and_dispose(DelItem<ListPair>());
76 }
7c673cae
FG
77 friend bool operator< (const Klass &a, const Klass &b)
78 { return a.key < b.key; }
79 friend bool operator> (const Klass &a, const Klass &b)
80 { return a.key > b.key; }
81 friend bool operator== (const Klass &a, const Klass &b)
82 { return a.key == b.key; }
11fdf7f2 83 void insert(unsigned cost, T&& item, bool front) {
7c673cae 84 if (front) {
11fdf7f2 85 lp.push_front(*new ListPair(cost, std::move(item)));
7c673cae 86 } else {
11fdf7f2 87 lp.push_back(*new ListPair(cost, std::move(item)));
7c673cae
FG
88 }
89 }
90 //Get the cost of the next item to dequeue
91 unsigned get_cost() const {
11fdf7f2 92 ceph_assert(!empty());
7c673cae
FG
93 return lp.begin()->cost;
94 }
95 T pop() {
11fdf7f2
TL
96 ceph_assert(!lp.empty());
97 T ret = std::move(lp.begin()->item);
7c673cae
FG
98 lp.erase_and_dispose(lp.begin(), DelItem<ListPair>());
99 return ret;
100 }
101 bool empty() const {
102 return lp.empty();
103 }
104 unsigned get_size() const {
105 return lp.size();
106 }
11fdf7f2 107 void filter_class(std::list<T>* out) {
7c673cae
FG
108 for (Lit i = --lp.end();; --i) {
109 if (out) {
11fdf7f2 110 out->push_front(std::move(i->item));
7c673cae
FG
111 }
112 i = lp.erase_and_dispose(i, DelItem<ListPair>());
7c673cae
FG
113 if (i == lp.begin()) {
114 break;
115 }
116 }
7c673cae
FG
117 }
118 };
119 class SubQueue : public bi::set_base_hook<>
120 {
121 typedef bi::rbtree<Klass> Klasses;
122 typedef typename Klasses::iterator Kit;
123 void check_end() {
124 if (next == klasses.end()) {
125 next = klasses.begin();
126 }
127 }
128 public:
129 unsigned key; // priority
130 Klasses klasses;
131 Kit next;
132 SubQueue(unsigned& p) :
133 key(p),
f64942e4
AA
134 next(klasses.begin()) {
135 }
136 ~SubQueue() {
137 klasses.clear_and_dispose(DelItem<Klass>());
138 }
7c673cae
FG
139 friend bool operator< (const SubQueue &a, const SubQueue &b)
140 { return a.key < b.key; }
141 friend bool operator> (const SubQueue &a, const SubQueue &b)
142 { return a.key > b.key; }
143 friend bool operator== (const SubQueue &a, const SubQueue &b)
144 { return a.key == b.key; }
145 bool empty() const {
146 return klasses.empty();
147 }
11fdf7f2 148 void insert(K cl, unsigned cost, T&& item, bool front = false) {
7c673cae
FG
149 typename Klasses::insert_commit_data insert_data;
150 std::pair<Kit, bool> ret =
151 klasses.insert_unique_check(cl, MapKey<Klass, K>(), insert_data);
152 if (ret.second) {
153 ret.first = klasses.insert_unique_commit(*new Klass(cl), insert_data);
154 check_end();
155 }
11fdf7f2 156 ret.first->insert(cost, std::move(item), front);
7c673cae
FG
157 }
158 unsigned get_cost() const {
11fdf7f2 159 ceph_assert(!empty());
7c673cae
FG
160 return next->get_cost();
161 }
162 T pop() {
163 T ret = next->pop();
164 if (next->empty()) {
165 next = klasses.erase_and_dispose(next, DelItem<Klass>());
166 } else {
167 ++next;
168 }
169 check_end();
170 return ret;
171 }
11fdf7f2 172 void filter_class(K& cl, std::list<T>* out) {
7c673cae
FG
173 Kit i = klasses.find(cl, MapKey<Klass, K>());
174 if (i != klasses.end()) {
11fdf7f2 175 i->filter_class(out);
7c673cae
FG
176 Kit tmp = klasses.erase_and_dispose(i, DelItem<Klass>());
177 if (next == i) {
178 next = tmp;
179 }
180 check_end();
181 }
11fdf7f2
TL
182 }
183 // this is intended for unit tests and should be never used on hot paths
184 unsigned get_size_slow() const {
185 unsigned count = 0;
186 for (const auto& klass : klasses) {
187 count += klass.get_size();
188 }
7c673cae
FG
189 return count;
190 }
191 void dump(ceph::Formatter *f) const {
192 f->dump_int("num_keys", next->get_size());
193 if (!empty()) {
194 f->dump_int("first_item_cost", next->get_cost());
195 }
196 }
197 };
198 class Queue {
199 typedef bi::rbtree<SubQueue> SubQueues;
200 typedef typename SubQueues::iterator Sit;
201 SubQueues queues;
202 unsigned total_prio;
203 unsigned max_cost;
204 public:
7c673cae
FG
205 Queue() :
206 total_prio(0),
11fdf7f2 207 max_cost(0) {
f64942e4
AA
208 }
209 ~Queue() {
210 queues.clear_and_dispose(DelItem<SubQueue>());
211 }
7c673cae 212 bool empty() const {
11fdf7f2 213 return queues.empty();
7c673cae 214 }
11fdf7f2 215 void insert(unsigned p, K cl, unsigned cost, T&& item, bool front = false) {
7c673cae
FG
216 typename SubQueues::insert_commit_data insert_data;
217 std::pair<typename SubQueues::iterator, bool> ret =
218 queues.insert_unique_check(p, MapKey<SubQueue, unsigned>(), insert_data);
219 if (ret.second) {
220 ret.first = queues.insert_unique_commit(*new SubQueue(p), insert_data);
221 total_prio += p;
222 }
11fdf7f2 223 ret.first->insert(cl, cost, std::move(item), front);
7c673cae
FG
224 if (cost > max_cost) {
225 max_cost = cost;
226 }
7c673cae
FG
227 }
228 T pop(bool strict = false) {
7c673cae
FG
229 Sit i = --queues.end();
230 if (strict) {
231 T ret = i->pop();
232 if (i->empty()) {
233 queues.erase_and_dispose(i, DelItem<SubQueue>());
234 }
235 return ret;
236 }
237 if (queues.size() > 1) {
238 while (true) {
239 // Pick a new priority out of the total priority.
240 unsigned prio = rand() % total_prio + 1;
241 unsigned tp = total_prio - i->key;
11fdf7f2 242 // Find the priority corresponding to the picked number.
7c673cae
FG
243 // Subtract high priorities to low priorities until the picked number
244 // is more than the total and try to dequeue that priority.
245 // Reverse the direction from previous implementation because there is a higher
246 // chance of dequeuing a high priority op so spend less time spinning.
247 while (prio <= tp) {
248 --i;
249 tp -= i->key;
250 }
251 // Flip a coin to see if this priority gets to run based on cost.
252 // The next op's cost is multiplied by .9 and subtracted from the
253 // max cost seen. Ops with lower costs will have a larger value
254 // and allow them to be selected easier than ops with high costs.
255 if (max_cost == 0 || rand() % max_cost <=
256 (max_cost - ((i->get_cost() * 9) / 10))) {
257 break;
258 }
259 i = --queues.end();
260 }
261 }
262 T ret = i->pop();
263 if (i->empty()) {
264 total_prio -= i->key;
265 queues.erase_and_dispose(i, DelItem<SubQueue>());
266 }
267 return ret;
268 }
7c673cae
FG
269 void filter_class(K& cl, std::list<T>* out) {
270 for (Sit i = queues.begin(); i != queues.end();) {
11fdf7f2 271 i->filter_class(cl, out);
7c673cae
FG
272 if (i->empty()) {
273 total_prio -= i->key;
274 i = queues.erase_and_dispose(i, DelItem<SubQueue>());
275 } else {
276 ++i;
277 }
278 }
279 }
11fdf7f2
TL
280 // this is intended for unit tests and should be never used on hot paths
281 unsigned get_size_slow() const {
282 unsigned count = 0;
283 for (const auto& queue : queues) {
284 count += queue.get_size_slow();
285 }
286 return count;
287 }
7c673cae
FG
288 void dump(ceph::Formatter *f) const {
289 for (typename SubQueues::const_iterator i = queues.begin();
290 i != queues.end(); ++i) {
291 f->dump_int("total_priority", total_prio);
292 f->dump_int("max_cost", max_cost);
293 f->open_object_section("subqueue");
294 f->dump_int("priority", i->key);
295 i->dump(f);
296 f->close_section();
297 }
298 }
299 };
300
301 Queue strict;
302 Queue normal;
303 public:
304 WeightedPriorityQueue(unsigned max_per, unsigned min_c) :
305 strict(),
306 normal()
307 {
308 std::srand(time(0));
309 }
7c673cae
FG
310 void remove_by_class(K cl, std::list<T>* removed = 0) final {
311 strict.filter_class(cl, removed);
312 normal.filter_class(cl, removed);
313 }
314 bool empty() const final {
11fdf7f2 315 return strict.empty() && normal.empty();
7c673cae 316 }
11fdf7f2
TL
317 void enqueue_strict(K cl, unsigned p, T&& item) final {
318 strict.insert(p, cl, 0, std::move(item));
7c673cae 319 }
11fdf7f2
TL
320 void enqueue_strict_front(K cl, unsigned p, T&& item) final {
321 strict.insert(p, cl, 0, std::move(item), true);
7c673cae 322 }
11fdf7f2
TL
323 void enqueue(K cl, unsigned p, unsigned cost, T&& item) final {
324 normal.insert(p, cl, cost, std::move(item));
7c673cae 325 }
11fdf7f2
TL
326 void enqueue_front(K cl, unsigned p, unsigned cost, T&& item) final {
327 normal.insert(p, cl, cost, std::move(item), true);
7c673cae
FG
328 }
329 T dequeue() override {
11fdf7f2 330 ceph_assert(!empty());
7c673cae
FG
331 if (!strict.empty()) {
332 return strict.pop(true);
333 }
334 return normal.pop();
335 }
11fdf7f2
TL
336 unsigned get_size_slow() {
337 return strict.get_size_slow() + normal.get_size_slow();
338 }
7c673cae
FG
339 void dump(ceph::Formatter *f) const override {
340 f->open_array_section("high_queues");
341 strict.dump(f);
342 f->close_section();
343 f->open_array_section("queues");
344 normal.dump(f);
345 f->close_section();
346 }
9f95a23c
TL
347
348 void print(std::ostream &ostream) const final {
349 ostream << "WeightedPriorityQueue";
350 }
7c673cae
FG
351};
352
353#endif