]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/WeightedPriorityQueue.h
update sources to v12.1.1
[ceph.git] / ceph / src / common / WeightedPriorityQueue.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef WP_QUEUE_H
16 #define WP_QUEUE_H
17
18 #include "OpQueue.h"
19
20 #include <boost/intrusive/list.hpp>
21 #include <boost/intrusive/rbtree.hpp>
22 #include <boost/intrusive/avl_set.hpp>
23
24 namespace bi = boost::intrusive;
25
26 template <typename T, typename S>
27 class MapKey
28 {
29 public:
30 bool operator()(const S i, const T &k) const
31 {
32 return i < k.key;
33 }
34 bool operator()(const T &k, const S i) const
35 {
36 return k.key < i;
37 }
38 };
39
40 template <typename T>
41 class DelItem
42 {
43 public:
44 void operator()(T* delete_this)
45 { delete delete_this; }
46 };
47
48 template <typename T, typename K>
49 class WeightedPriorityQueue : public OpQueue <T, K>
50 {
51 private:
52 class ListPair : public bi::list_base_hook<>
53 {
54 public:
55 unsigned cost;
56 T item;
57 ListPair(unsigned c, T& i) :
58 cost(c),
59 item(i)
60 {}
61 };
62 class Klass : public bi::set_base_hook<>
63 {
64 typedef bi::list<ListPair> ListPairs;
65 typedef typename ListPairs::iterator Lit;
66 public:
67 K key; // klass
68 ListPairs lp;
69 Klass(K& k) :
70 key(k)
71 {}
72 friend bool operator< (const Klass &a, const Klass &b)
73 { return a.key < b.key; }
74 friend bool operator> (const Klass &a, const Klass &b)
75 { return a.key > b.key; }
76 friend bool operator== (const Klass &a, const Klass &b)
77 { return a.key == b.key; }
78 void insert(unsigned cost, T& item, bool front) {
79 if (front) {
80 lp.push_front(*new ListPair(cost, item));
81 } else {
82 lp.push_back(*new ListPair(cost, item));
83 }
84 }
85 //Get the cost of the next item to dequeue
86 unsigned get_cost() const {
87 assert(!empty());
88 return lp.begin()->cost;
89 }
90 T pop() {
91 assert(!lp.empty());
92 T ret = lp.begin()->item;
93 lp.erase_and_dispose(lp.begin(), DelItem<ListPair>());
94 return ret;
95 }
96 bool empty() const {
97 return lp.empty();
98 }
99 unsigned get_size() const {
100 return lp.size();
101 }
102 unsigned filter_class(std::list<T>* out) {
103 unsigned count = 0;
104 for (Lit i = --lp.end();; --i) {
105 if (out) {
106 out->push_front(i->item);
107 }
108 i = lp.erase_and_dispose(i, DelItem<ListPair>());
109 ++count;
110 if (i == lp.begin()) {
111 break;
112 }
113 }
114 return count;
115 }
116 };
117 class SubQueue : public bi::set_base_hook<>
118 {
119 typedef bi::rbtree<Klass> Klasses;
120 typedef typename Klasses::iterator Kit;
121 void check_end() {
122 if (next == klasses.end()) {
123 next = klasses.begin();
124 }
125 }
126 public:
127 unsigned key; // priority
128 Klasses klasses;
129 Kit next;
130 SubQueue(unsigned& p) :
131 key(p),
132 next(klasses.begin())
133 {}
134 friend bool operator< (const SubQueue &a, const SubQueue &b)
135 { return a.key < b.key; }
136 friend bool operator> (const SubQueue &a, const SubQueue &b)
137 { return a.key > b.key; }
138 friend bool operator== (const SubQueue &a, const SubQueue &b)
139 { return a.key == b.key; }
140 bool empty() const {
141 return klasses.empty();
142 }
143 void insert(K cl, unsigned cost, T& item, bool front = false) {
144 typename Klasses::insert_commit_data insert_data;
145 std::pair<Kit, bool> ret =
146 klasses.insert_unique_check(cl, MapKey<Klass, K>(), insert_data);
147 if (ret.second) {
148 ret.first = klasses.insert_unique_commit(*new Klass(cl), insert_data);
149 check_end();
150 }
151 ret.first->insert(cost, item, front);
152 }
153 unsigned get_cost() const {
154 assert(!empty());
155 return next->get_cost();
156 }
157 T pop() {
158 T ret = next->pop();
159 if (next->empty()) {
160 next = klasses.erase_and_dispose(next, DelItem<Klass>());
161 } else {
162 ++next;
163 }
164 check_end();
165 return ret;
166 }
167 unsigned filter_class(K& cl, std::list<T>* out) {
168 unsigned count = 0;
169 Kit i = klasses.find(cl, MapKey<Klass, K>());
170 if (i != klasses.end()) {
171 count = i->filter_class(out);
172 Kit tmp = klasses.erase_and_dispose(i, DelItem<Klass>());
173 if (next == i) {
174 next = tmp;
175 }
176 check_end();
177 }
178 return count;
179 }
180 void dump(ceph::Formatter *f) const {
181 f->dump_int("num_keys", next->get_size());
182 if (!empty()) {
183 f->dump_int("first_item_cost", next->get_cost());
184 }
185 }
186 };
187 class Queue {
188 typedef bi::rbtree<SubQueue> SubQueues;
189 typedef typename SubQueues::iterator Sit;
190 SubQueues queues;
191 unsigned total_prio;
192 unsigned max_cost;
193 public:
194 unsigned size;
195 Queue() :
196 total_prio(0),
197 max_cost(0),
198 size(0)
199 {}
200 bool empty() const {
201 return !size;
202 }
203 void insert(unsigned p, K cl, unsigned cost, T& item, bool front = false) {
204 typename SubQueues::insert_commit_data insert_data;
205 std::pair<typename SubQueues::iterator, bool> ret =
206 queues.insert_unique_check(p, MapKey<SubQueue, unsigned>(), insert_data);
207 if (ret.second) {
208 ret.first = queues.insert_unique_commit(*new SubQueue(p), insert_data);
209 total_prio += p;
210 }
211 ret.first->insert(cl, cost, item, front);
212 if (cost > max_cost) {
213 max_cost = cost;
214 }
215 ++size;
216 }
217 T pop(bool strict = false) {
218 --size;
219 Sit i = --queues.end();
220 if (strict) {
221 T ret = i->pop();
222 if (i->empty()) {
223 queues.erase_and_dispose(i, DelItem<SubQueue>());
224 }
225 return ret;
226 }
227 if (queues.size() > 1) {
228 while (true) {
229 // Pick a new priority out of the total priority.
230 unsigned prio = rand() % total_prio + 1;
231 unsigned tp = total_prio - i->key;
232 // Find the priority coresponding to the picked number.
233 // Subtract high priorities to low priorities until the picked number
234 // is more than the total and try to dequeue that priority.
235 // Reverse the direction from previous implementation because there is a higher
236 // chance of dequeuing a high priority op so spend less time spinning.
237 while (prio <= tp) {
238 --i;
239 tp -= i->key;
240 }
241 // Flip a coin to see if this priority gets to run based on cost.
242 // The next op's cost is multiplied by .9 and subtracted from the
243 // max cost seen. Ops with lower costs will have a larger value
244 // and allow them to be selected easier than ops with high costs.
245 if (max_cost == 0 || rand() % max_cost <=
246 (max_cost - ((i->get_cost() * 9) / 10))) {
247 break;
248 }
249 i = --queues.end();
250 }
251 }
252 T ret = i->pop();
253 if (i->empty()) {
254 total_prio -= i->key;
255 queues.erase_and_dispose(i, DelItem<SubQueue>());
256 }
257 return ret;
258 }
259 void filter_class(K& cl, std::list<T>* out) {
260 for (Sit i = queues.begin(); i != queues.end();) {
261 size -= i->filter_class(cl, out);
262 if (i->empty()) {
263 total_prio -= i->key;
264 i = queues.erase_and_dispose(i, DelItem<SubQueue>());
265 } else {
266 ++i;
267 }
268 }
269 }
270 void dump(ceph::Formatter *f) const {
271 for (typename SubQueues::const_iterator i = queues.begin();
272 i != queues.end(); ++i) {
273 f->dump_int("total_priority", total_prio);
274 f->dump_int("max_cost", max_cost);
275 f->open_object_section("subqueue");
276 f->dump_int("priority", i->key);
277 i->dump(f);
278 f->close_section();
279 }
280 }
281 };
282
283 Queue strict;
284 Queue normal;
285 public:
286 WeightedPriorityQueue(unsigned max_per, unsigned min_c) :
287 strict(),
288 normal()
289 {
290 std::srand(time(0));
291 }
292 unsigned length() const final {
293 return strict.size + normal.size;
294 }
295 void remove_by_class(K cl, std::list<T>* removed = 0) final {
296 strict.filter_class(cl, removed);
297 normal.filter_class(cl, removed);
298 }
299 bool empty() const final {
300 return !(strict.size + normal.size);
301 }
302 void enqueue_strict(K cl, unsigned p, T item) final {
303 strict.insert(p, cl, 0, item);
304 }
305 void enqueue_strict_front(K cl, unsigned p, T item) final {
306 strict.insert(p, cl, 0, item, true);
307 }
308 void enqueue(K cl, unsigned p, unsigned cost, T item) final {
309 normal.insert(p, cl, cost, item);
310 }
311 void enqueue_front(K cl, unsigned p, unsigned cost, T item) final {
312 normal.insert(p, cl, cost, item, true);
313 }
314 T dequeue() override {
315 assert(strict.size + normal.size > 0);
316 if (!strict.empty()) {
317 return strict.pop(true);
318 }
319 return normal.pop();
320 }
321 void dump(ceph::Formatter *f) const override {
322 f->open_array_section("high_queues");
323 strict.dump(f);
324 f->close_section();
325 f->open_array_section("queues");
326 normal.dump(f);
327 f->close_section();
328 }
329 };
330
331 #endif