]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/WeightedPriorityQueue.h
10d6f0d4514e4b25063045f9ef8a9bd63214cd95
[ceph.git] / ceph / src / common / WeightedPriorityQueue.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef WP_QUEUE_H
16 #define WP_QUEUE_H
17
18 #include "OpQueue.h"
19
20 #include <boost/intrusive/list.hpp>
21 #include <boost/intrusive/rbtree.hpp>
22 #include <boost/intrusive/avl_set.hpp>
23
24 namespace bi = boost::intrusive;
25
26 template <typename T, typename S>
27 class MapKey
28 {
29 public:
30 bool operator()(const S i, const T &k) const
31 {
32 return i < k.key;
33 }
34 bool operator()(const T &k, const S i) const
35 {
36 return k.key < i;
37 }
38 };
39
40 template <typename T>
41 class DelItem
42 {
43 public:
44 void operator()(T* delete_this)
45 { delete delete_this; }
46 };
47
48 template <typename T, typename K>
49 class WeightedPriorityQueue : public OpQueue <T, K>
50 {
51 private:
52 class ListPair : public bi::list_base_hook<>
53 {
54 public:
55 unsigned cost;
56 T item;
57 ListPair(unsigned c, T& i) :
58 cost(c),
59 item(i)
60 {}
61 };
62 class Klass : public bi::set_base_hook<>
63 {
64 typedef bi::list<ListPair> ListPairs;
65 typedef typename ListPairs::iterator Lit;
66 public:
67 K key; // klass
68 ListPairs lp;
69 Klass(K& k) :
70 key(k)
71 {}
72 friend bool operator< (const Klass &a, const Klass &b)
73 { return a.key < b.key; }
74 friend bool operator> (const Klass &a, const Klass &b)
75 { return a.key > b.key; }
76 friend bool operator== (const Klass &a, const Klass &b)
77 { return a.key == b.key; }
78 void insert(unsigned cost, T& item, bool front) {
79 if (front) {
80 lp.push_front(*new ListPair(cost, item));
81 } else {
82 lp.push_back(*new ListPair(cost, item));
83 }
84 }
85 //Get the cost of the next item to dequeue
86 unsigned get_cost() const {
87 assert(!empty());
88 return lp.begin()->cost;
89 }
90 T pop() {
91 assert(!lp.empty());
92 T ret = lp.begin()->item;
93 lp.erase_and_dispose(lp.begin(), DelItem<ListPair>());
94 return ret;
95 }
96 bool empty() const {
97 return lp.empty();
98 }
99 unsigned get_size() const {
100 return lp.size();
101 }
102 unsigned filter_list_pairs(std::function<bool (T)>& f) {
103 unsigned count = 0;
104 // intrusive containers can't erase with a reverse_iterator
105 // so we have to walk backwards on our own. Since there is
106 // no iterator before begin, we have to test at the end.
107 for (Lit i = --lp.end();; --i) {
108 if (f(i->item)) {
109 i = lp.erase_and_dispose(i, DelItem<ListPair>());
110 ++count;
111 }
112 if (i == lp.begin()) {
113 break;
114 }
115 }
116 return count;
117 }
118 unsigned filter_class(std::list<T>* out) {
119 unsigned count = 0;
120 for (Lit i = --lp.end();; --i) {
121 if (out) {
122 out->push_front(i->item);
123 }
124 i = lp.erase_and_dispose(i, DelItem<ListPair>());
125 ++count;
126 if (i == lp.begin()) {
127 break;
128 }
129 }
130 return count;
131 }
132 };
133 class SubQueue : public bi::set_base_hook<>
134 {
135 typedef bi::rbtree<Klass> Klasses;
136 typedef typename Klasses::iterator Kit;
137 void check_end() {
138 if (next == klasses.end()) {
139 next = klasses.begin();
140 }
141 }
142 public:
143 unsigned key; // priority
144 Klasses klasses;
145 Kit next;
146 SubQueue(unsigned& p) :
147 key(p),
148 next(klasses.begin())
149 {}
150 friend bool operator< (const SubQueue &a, const SubQueue &b)
151 { return a.key < b.key; }
152 friend bool operator> (const SubQueue &a, const SubQueue &b)
153 { return a.key > b.key; }
154 friend bool operator== (const SubQueue &a, const SubQueue &b)
155 { return a.key == b.key; }
156 bool empty() const {
157 return klasses.empty();
158 }
159 void insert(K cl, unsigned cost, T& item, bool front = false) {
160 typename Klasses::insert_commit_data insert_data;
161 std::pair<Kit, bool> ret =
162 klasses.insert_unique_check(cl, MapKey<Klass, K>(), insert_data);
163 if (ret.second) {
164 ret.first = klasses.insert_unique_commit(*new Klass(cl), insert_data);
165 check_end();
166 }
167 ret.first->insert(cost, item, front);
168 }
169 unsigned get_cost() const {
170 assert(!empty());
171 return next->get_cost();
172 }
173 T pop() {
174 T ret = next->pop();
175 if (next->empty()) {
176 next = klasses.erase_and_dispose(next, DelItem<Klass>());
177 } else {
178 ++next;
179 }
180 check_end();
181 return ret;
182 }
183 unsigned filter_list_pairs(std::function<bool (T)>& f) {
184 unsigned count = 0;
185 // intrusive containers can't erase with a reverse_iterator
186 // so we have to walk backwards on our own. Since there is
187 // no iterator before begin, we have to test at the end.
188 for (Kit i = klasses.begin(); i != klasses.end();) {
189 count += i->filter_list_pairs(f);
190 if (i->empty()) {
191 if (next == i) {
192 ++next;
193 }
194 i = klasses.erase_and_dispose(i, DelItem<Klass>());
195 } else {
196 ++i;
197 }
198 }
199 check_end();
200 return count;
201 }
202 unsigned filter_class(K& cl, std::list<T>* out) {
203 unsigned count = 0;
204 Kit i = klasses.find(cl, MapKey<Klass, K>());
205 if (i != klasses.end()) {
206 count = i->filter_class(out);
207 Kit tmp = klasses.erase_and_dispose(i, DelItem<Klass>());
208 if (next == i) {
209 next = tmp;
210 }
211 check_end();
212 }
213 return count;
214 }
215 void dump(ceph::Formatter *f) const {
216 f->dump_int("num_keys", next->get_size());
217 if (!empty()) {
218 f->dump_int("first_item_cost", next->get_cost());
219 }
220 }
221 };
222 class Queue {
223 typedef bi::rbtree<SubQueue> SubQueues;
224 typedef typename SubQueues::iterator Sit;
225 SubQueues queues;
226 unsigned total_prio;
227 unsigned max_cost;
228 public:
229 unsigned size;
230 Queue() :
231 total_prio(0),
232 max_cost(0),
233 size(0)
234 {}
235 bool empty() const {
236 return !size;
237 }
238 void insert(unsigned p, K cl, unsigned cost, T& item, bool front = false) {
239 typename SubQueues::insert_commit_data insert_data;
240 std::pair<typename SubQueues::iterator, bool> ret =
241 queues.insert_unique_check(p, MapKey<SubQueue, unsigned>(), insert_data);
242 if (ret.second) {
243 ret.first = queues.insert_unique_commit(*new SubQueue(p), insert_data);
244 total_prio += p;
245 }
246 ret.first->insert(cl, cost, item, front);
247 if (cost > max_cost) {
248 max_cost = cost;
249 }
250 ++size;
251 }
252 T pop(bool strict = false) {
253 --size;
254 Sit i = --queues.end();
255 if (strict) {
256 T ret = i->pop();
257 if (i->empty()) {
258 queues.erase_and_dispose(i, DelItem<SubQueue>());
259 }
260 return ret;
261 }
262 if (queues.size() > 1) {
263 while (true) {
264 // Pick a new priority out of the total priority.
265 unsigned prio = rand() % total_prio + 1;
266 unsigned tp = total_prio - i->key;
267 // Find the priority coresponding to the picked number.
268 // Subtract high priorities to low priorities until the picked number
269 // is more than the total and try to dequeue that priority.
270 // Reverse the direction from previous implementation because there is a higher
271 // chance of dequeuing a high priority op so spend less time spinning.
272 while (prio <= tp) {
273 --i;
274 tp -= i->key;
275 }
276 // Flip a coin to see if this priority gets to run based on cost.
277 // The next op's cost is multiplied by .9 and subtracted from the
278 // max cost seen. Ops with lower costs will have a larger value
279 // and allow them to be selected easier than ops with high costs.
280 if (max_cost == 0 || rand() % max_cost <=
281 (max_cost - ((i->get_cost() * 9) / 10))) {
282 break;
283 }
284 i = --queues.end();
285 }
286 }
287 T ret = i->pop();
288 if (i->empty()) {
289 total_prio -= i->key;
290 queues.erase_and_dispose(i, DelItem<SubQueue>());
291 }
292 return ret;
293 }
294 void filter_list_pairs(std::function<bool (T)>& f) {
295 for (Sit i = queues.begin(); i != queues.end();) {
296 size -= i->filter_list_pairs(f);
297 if (i->empty()) {
298 total_prio -= i->key;
299 i = queues.erase_and_dispose(i, DelItem<SubQueue>());
300 } else {
301 ++i;
302 }
303 }
304 }
305 void filter_class(K& cl, std::list<T>* out) {
306 for (Sit i = queues.begin(); i != queues.end();) {
307 size -= i->filter_class(cl, out);
308 if (i->empty()) {
309 total_prio -= i->key;
310 i = queues.erase_and_dispose(i, DelItem<SubQueue>());
311 } else {
312 ++i;
313 }
314 }
315 }
316 void dump(ceph::Formatter *f) const {
317 for (typename SubQueues::const_iterator i = queues.begin();
318 i != queues.end(); ++i) {
319 f->dump_int("total_priority", total_prio);
320 f->dump_int("max_cost", max_cost);
321 f->open_object_section("subqueue");
322 f->dump_int("priority", i->key);
323 i->dump(f);
324 f->close_section();
325 }
326 }
327 };
328
329 Queue strict;
330 Queue normal;
331 public:
332 WeightedPriorityQueue(unsigned max_per, unsigned min_c) :
333 strict(),
334 normal()
335 {
336 std::srand(time(0));
337 }
338 unsigned length() const final {
339 return strict.size + normal.size;
340 }
341 void remove_by_filter(std::function<bool (T)> f) final {
342 strict.filter_list_pairs(f);
343 normal.filter_list_pairs(f);
344 }
345 void remove_by_class(K cl, std::list<T>* removed = 0) final {
346 strict.filter_class(cl, removed);
347 normal.filter_class(cl, removed);
348 }
349 bool empty() const final {
350 return !(strict.size + normal.size);
351 }
352 void enqueue_strict(K cl, unsigned p, T item) final {
353 strict.insert(p, cl, 0, item);
354 }
355 void enqueue_strict_front(K cl, unsigned p, T item) final {
356 strict.insert(p, cl, 0, item, true);
357 }
358 void enqueue(K cl, unsigned p, unsigned cost, T item) final {
359 normal.insert(p, cl, cost, item);
360 }
361 void enqueue_front(K cl, unsigned p, unsigned cost, T item) final {
362 normal.insert(p, cl, cost, item, true);
363 }
364 T dequeue() override {
365 assert(strict.size + normal.size > 0);
366 if (!strict.empty()) {
367 return strict.pop(true);
368 }
369 return normal.pop();
370 }
371 void dump(ceph::Formatter *f) const override {
372 f->open_array_section("high_queues");
373 strict.dump(f);
374 f->close_section();
375 f->open_array_section("queues");
376 normal.dump(f);
377 f->close_section();
378 }
379 };
380
381 #endif