]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef WP_QUEUE_H | |
16 | #define WP_QUEUE_H | |
17 | ||
18 | #include "OpQueue.h" | |
19 | ||
20 | #include <boost/intrusive/list.hpp> | |
21 | #include <boost/intrusive/rbtree.hpp> | |
22 | #include <boost/intrusive/avl_set.hpp> | |
23 | ||
11fdf7f2 TL |
24 | #include "include/ceph_assert.h" |
25 | ||
7c673cae FG |
26 | namespace bi = boost::intrusive; |
27 | ||
28 | template <typename T, typename S> | |
29 | class MapKey | |
30 | { | |
31 | public: | |
32 | bool operator()(const S i, const T &k) const | |
33 | { | |
34 | return i < k.key; | |
35 | } | |
36 | bool operator()(const T &k, const S i) const | |
37 | { | |
38 | return k.key < i; | |
39 | } | |
40 | }; | |
41 | ||
42 | template <typename T> | |
43 | class DelItem | |
44 | { | |
45 | public: | |
46 | void operator()(T* delete_this) | |
47 | { delete delete_this; } | |
48 | }; | |
49 | ||
50 | template <typename T, typename K> | |
51 | class WeightedPriorityQueue : public OpQueue <T, K> | |
52 | { | |
53 | private: | |
54 | class ListPair : public bi::list_base_hook<> | |
55 | { | |
56 | public: | |
57 | unsigned cost; | |
58 | T item; | |
11fdf7f2 | 59 | ListPair(unsigned c, T&& i) : |
7c673cae | 60 | cost(c), |
11fdf7f2 TL |
61 | item(std::move(i)) |
62 | {} | |
7c673cae FG |
63 | }; |
64 | class Klass : public bi::set_base_hook<> | |
65 | { | |
66 | typedef bi::list<ListPair> ListPairs; | |
67 | typedef typename ListPairs::iterator Lit; | |
68 | public: | |
69 | K key; // klass | |
70 | ListPairs lp; | |
71 | Klass(K& k) : | |
f64942e4 AA |
72 | key(k) { |
73 | } | |
74 | ~Klass() { | |
75 | lp.clear_and_dispose(DelItem<ListPair>()); | |
76 | } | |
7c673cae FG |
77 | friend bool operator< (const Klass &a, const Klass &b) |
78 | { return a.key < b.key; } | |
79 | friend bool operator> (const Klass &a, const Klass &b) | |
80 | { return a.key > b.key; } | |
81 | friend bool operator== (const Klass &a, const Klass &b) | |
82 | { return a.key == b.key; } | |
11fdf7f2 | 83 | void insert(unsigned cost, T&& item, bool front) { |
7c673cae | 84 | if (front) { |
11fdf7f2 | 85 | lp.push_front(*new ListPair(cost, std::move(item))); |
7c673cae | 86 | } else { |
11fdf7f2 | 87 | lp.push_back(*new ListPair(cost, std::move(item))); |
7c673cae FG |
88 | } |
89 | } | |
90 | //Get the cost of the next item to dequeue | |
91 | unsigned get_cost() const { | |
11fdf7f2 | 92 | ceph_assert(!empty()); |
7c673cae FG |
93 | return lp.begin()->cost; |
94 | } | |
95 | T pop() { | |
11fdf7f2 TL |
96 | ceph_assert(!lp.empty()); |
97 | T ret = std::move(lp.begin()->item); | |
7c673cae FG |
98 | lp.erase_and_dispose(lp.begin(), DelItem<ListPair>()); |
99 | return ret; | |
100 | } | |
101 | bool empty() const { | |
102 | return lp.empty(); | |
103 | } | |
104 | unsigned get_size() const { | |
105 | return lp.size(); | |
106 | } | |
11fdf7f2 | 107 | void filter_class(std::list<T>* out) { |
7c673cae FG |
108 | for (Lit i = --lp.end();; --i) { |
109 | if (out) { | |
11fdf7f2 | 110 | out->push_front(std::move(i->item)); |
7c673cae FG |
111 | } |
112 | i = lp.erase_and_dispose(i, DelItem<ListPair>()); | |
7c673cae FG |
113 | if (i == lp.begin()) { |
114 | break; | |
115 | } | |
116 | } | |
7c673cae FG |
117 | } |
118 | }; | |
119 | class SubQueue : public bi::set_base_hook<> | |
120 | { | |
121 | typedef bi::rbtree<Klass> Klasses; | |
122 | typedef typename Klasses::iterator Kit; | |
123 | void check_end() { | |
124 | if (next == klasses.end()) { | |
125 | next = klasses.begin(); | |
126 | } | |
127 | } | |
128 | public: | |
129 | unsigned key; // priority | |
130 | Klasses klasses; | |
131 | Kit next; | |
132 | SubQueue(unsigned& p) : | |
133 | key(p), | |
f64942e4 AA |
134 | next(klasses.begin()) { |
135 | } | |
136 | ~SubQueue() { | |
137 | klasses.clear_and_dispose(DelItem<Klass>()); | |
138 | } | |
7c673cae FG |
139 | friend bool operator< (const SubQueue &a, const SubQueue &b) |
140 | { return a.key < b.key; } | |
141 | friend bool operator> (const SubQueue &a, const SubQueue &b) | |
142 | { return a.key > b.key; } | |
143 | friend bool operator== (const SubQueue &a, const SubQueue &b) | |
144 | { return a.key == b.key; } | |
145 | bool empty() const { | |
146 | return klasses.empty(); | |
147 | } | |
11fdf7f2 | 148 | void insert(K cl, unsigned cost, T&& item, bool front = false) { |
7c673cae FG |
149 | typename Klasses::insert_commit_data insert_data; |
150 | std::pair<Kit, bool> ret = | |
151 | klasses.insert_unique_check(cl, MapKey<Klass, K>(), insert_data); | |
152 | if (ret.second) { | |
153 | ret.first = klasses.insert_unique_commit(*new Klass(cl), insert_data); | |
154 | check_end(); | |
155 | } | |
11fdf7f2 | 156 | ret.first->insert(cost, std::move(item), front); |
7c673cae FG |
157 | } |
158 | unsigned get_cost() const { | |
11fdf7f2 | 159 | ceph_assert(!empty()); |
7c673cae FG |
160 | return next->get_cost(); |
161 | } | |
162 | T pop() { | |
163 | T ret = next->pop(); | |
164 | if (next->empty()) { | |
165 | next = klasses.erase_and_dispose(next, DelItem<Klass>()); | |
166 | } else { | |
167 | ++next; | |
168 | } | |
169 | check_end(); | |
170 | return ret; | |
171 | } | |
11fdf7f2 | 172 | void filter_class(K& cl, std::list<T>* out) { |
7c673cae FG |
173 | Kit i = klasses.find(cl, MapKey<Klass, K>()); |
174 | if (i != klasses.end()) { | |
11fdf7f2 | 175 | i->filter_class(out); |
7c673cae FG |
176 | Kit tmp = klasses.erase_and_dispose(i, DelItem<Klass>()); |
177 | if (next == i) { | |
178 | next = tmp; | |
179 | } | |
180 | check_end(); | |
181 | } | |
11fdf7f2 TL |
182 | } |
183 | // this is intended for unit tests and should be never used on hot paths | |
184 | unsigned get_size_slow() const { | |
185 | unsigned count = 0; | |
186 | for (const auto& klass : klasses) { | |
187 | count += klass.get_size(); | |
188 | } | |
7c673cae FG |
189 | return count; |
190 | } | |
191 | void dump(ceph::Formatter *f) const { | |
192 | f->dump_int("num_keys", next->get_size()); | |
193 | if (!empty()) { | |
194 | f->dump_int("first_item_cost", next->get_cost()); | |
195 | } | |
196 | } | |
197 | }; | |
198 | class Queue { | |
199 | typedef bi::rbtree<SubQueue> SubQueues; | |
200 | typedef typename SubQueues::iterator Sit; | |
201 | SubQueues queues; | |
202 | unsigned total_prio; | |
203 | unsigned max_cost; | |
204 | public: | |
7c673cae FG |
205 | Queue() : |
206 | total_prio(0), | |
11fdf7f2 | 207 | max_cost(0) { |
f64942e4 AA |
208 | } |
209 | ~Queue() { | |
210 | queues.clear_and_dispose(DelItem<SubQueue>()); | |
211 | } | |
7c673cae | 212 | bool empty() const { |
11fdf7f2 | 213 | return queues.empty(); |
7c673cae | 214 | } |
11fdf7f2 | 215 | void insert(unsigned p, K cl, unsigned cost, T&& item, bool front = false) { |
7c673cae FG |
216 | typename SubQueues::insert_commit_data insert_data; |
217 | std::pair<typename SubQueues::iterator, bool> ret = | |
218 | queues.insert_unique_check(p, MapKey<SubQueue, unsigned>(), insert_data); | |
219 | if (ret.second) { | |
220 | ret.first = queues.insert_unique_commit(*new SubQueue(p), insert_data); | |
221 | total_prio += p; | |
222 | } | |
11fdf7f2 | 223 | ret.first->insert(cl, cost, std::move(item), front); |
7c673cae FG |
224 | if (cost > max_cost) { |
225 | max_cost = cost; | |
226 | } | |
7c673cae FG |
227 | } |
228 | T pop(bool strict = false) { | |
7c673cae FG |
229 | Sit i = --queues.end(); |
230 | if (strict) { | |
231 | T ret = i->pop(); | |
232 | if (i->empty()) { | |
233 | queues.erase_and_dispose(i, DelItem<SubQueue>()); | |
234 | } | |
235 | return ret; | |
236 | } | |
237 | if (queues.size() > 1) { | |
238 | while (true) { | |
239 | // Pick a new priority out of the total priority. | |
240 | unsigned prio = rand() % total_prio + 1; | |
241 | unsigned tp = total_prio - i->key; | |
11fdf7f2 | 242 | // Find the priority corresponding to the picked number. |
7c673cae FG |
243 | // Subtract high priorities to low priorities until the picked number |
244 | // is more than the total and try to dequeue that priority. | |
245 | // Reverse the direction from previous implementation because there is a higher | |
246 | // chance of dequeuing a high priority op so spend less time spinning. | |
247 | while (prio <= tp) { | |
248 | --i; | |
249 | tp -= i->key; | |
250 | } | |
251 | // Flip a coin to see if this priority gets to run based on cost. | |
252 | // The next op's cost is multiplied by .9 and subtracted from the | |
253 | // max cost seen. Ops with lower costs will have a larger value | |
254 | // and allow them to be selected easier than ops with high costs. | |
255 | if (max_cost == 0 || rand() % max_cost <= | |
256 | (max_cost - ((i->get_cost() * 9) / 10))) { | |
257 | break; | |
258 | } | |
259 | i = --queues.end(); | |
260 | } | |
261 | } | |
262 | T ret = i->pop(); | |
263 | if (i->empty()) { | |
264 | total_prio -= i->key; | |
265 | queues.erase_and_dispose(i, DelItem<SubQueue>()); | |
266 | } | |
267 | return ret; | |
268 | } | |
7c673cae FG |
269 | void filter_class(K& cl, std::list<T>* out) { |
270 | for (Sit i = queues.begin(); i != queues.end();) { | |
11fdf7f2 | 271 | i->filter_class(cl, out); |
7c673cae FG |
272 | if (i->empty()) { |
273 | total_prio -= i->key; | |
274 | i = queues.erase_and_dispose(i, DelItem<SubQueue>()); | |
275 | } else { | |
276 | ++i; | |
277 | } | |
278 | } | |
279 | } | |
11fdf7f2 TL |
280 | // this is intended for unit tests and should be never used on hot paths |
281 | unsigned get_size_slow() const { | |
282 | unsigned count = 0; | |
283 | for (const auto& queue : queues) { | |
284 | count += queue.get_size_slow(); | |
285 | } | |
286 | return count; | |
287 | } | |
7c673cae FG |
288 | void dump(ceph::Formatter *f) const { |
289 | for (typename SubQueues::const_iterator i = queues.begin(); | |
290 | i != queues.end(); ++i) { | |
291 | f->dump_int("total_priority", total_prio); | |
292 | f->dump_int("max_cost", max_cost); | |
293 | f->open_object_section("subqueue"); | |
294 | f->dump_int("priority", i->key); | |
295 | i->dump(f); | |
296 | f->close_section(); | |
297 | } | |
298 | } | |
299 | }; | |
300 | ||
301 | Queue strict; | |
302 | Queue normal; | |
303 | public: | |
304 | WeightedPriorityQueue(unsigned max_per, unsigned min_c) : | |
305 | strict(), | |
306 | normal() | |
307 | { | |
308 | std::srand(time(0)); | |
309 | } | |
7c673cae FG |
310 | void remove_by_class(K cl, std::list<T>* removed = 0) final { |
311 | strict.filter_class(cl, removed); | |
312 | normal.filter_class(cl, removed); | |
313 | } | |
314 | bool empty() const final { | |
11fdf7f2 | 315 | return strict.empty() && normal.empty(); |
7c673cae | 316 | } |
11fdf7f2 TL |
317 | void enqueue_strict(K cl, unsigned p, T&& item) final { |
318 | strict.insert(p, cl, 0, std::move(item)); | |
7c673cae | 319 | } |
11fdf7f2 TL |
320 | void enqueue_strict_front(K cl, unsigned p, T&& item) final { |
321 | strict.insert(p, cl, 0, std::move(item), true); | |
7c673cae | 322 | } |
11fdf7f2 TL |
323 | void enqueue(K cl, unsigned p, unsigned cost, T&& item) final { |
324 | normal.insert(p, cl, cost, std::move(item)); | |
7c673cae | 325 | } |
11fdf7f2 TL |
326 | void enqueue_front(K cl, unsigned p, unsigned cost, T&& item) final { |
327 | normal.insert(p, cl, cost, std::move(item), true); | |
7c673cae FG |
328 | } |
329 | T dequeue() override { | |
11fdf7f2 | 330 | ceph_assert(!empty()); |
7c673cae FG |
331 | if (!strict.empty()) { |
332 | return strict.pop(true); | |
333 | } | |
334 | return normal.pop(); | |
335 | } | |
11fdf7f2 TL |
336 | unsigned get_size_slow() { |
337 | return strict.get_size_slow() + normal.get_size_slow(); | |
338 | } | |
7c673cae FG |
339 | void dump(ceph::Formatter *f) const override { |
340 | f->open_array_section("high_queues"); | |
341 | strict.dump(f); | |
342 | f->close_section(); | |
343 | f->open_array_section("queues"); | |
344 | normal.dump(f); | |
345 | f->close_section(); | |
346 | } | |
9f95a23c TL |
347 | |
348 | void print(std::ostream &ostream) const final { | |
349 | ostream << "WeightedPriorityQueue"; | |
350 | } | |
7c673cae FG |
351 | }; |
352 | ||
353 | #endif |