]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef WP_QUEUE_H | |
16 | #define WP_QUEUE_H | |
17 | ||
18 | #include "OpQueue.h" | |
19 | ||
20 | #include <boost/intrusive/list.hpp> | |
21 | #include <boost/intrusive/rbtree.hpp> | |
22 | #include <boost/intrusive/avl_set.hpp> | |
23 | ||
24 | namespace bi = boost::intrusive; | |
25 | ||
26 | template <typename T, typename S> | |
27 | class MapKey | |
28 | { | |
29 | public: | |
30 | bool operator()(const S i, const T &k) const | |
31 | { | |
32 | return i < k.key; | |
33 | } | |
34 | bool operator()(const T &k, const S i) const | |
35 | { | |
36 | return k.key < i; | |
37 | } | |
38 | }; | |
39 | ||
40 | template <typename T> | |
41 | class DelItem | |
42 | { | |
43 | public: | |
44 | void operator()(T* delete_this) | |
45 | { delete delete_this; } | |
46 | }; | |
47 | ||
48 | template <typename T, typename K> | |
49 | class WeightedPriorityQueue : public OpQueue <T, K> | |
50 | { | |
51 | private: | |
52 | class ListPair : public bi::list_base_hook<> | |
53 | { | |
54 | public: | |
55 | unsigned cost; | |
56 | T item; | |
57 | ListPair(unsigned c, T& i) : | |
58 | cost(c), | |
59 | item(i) | |
60 | {} | |
61 | }; | |
62 | class Klass : public bi::set_base_hook<> | |
63 | { | |
64 | typedef bi::list<ListPair> ListPairs; | |
65 | typedef typename ListPairs::iterator Lit; | |
66 | public: | |
67 | K key; // klass | |
68 | ListPairs lp; | |
69 | Klass(K& k) : | |
70 | key(k) | |
71 | {} | |
72 | friend bool operator< (const Klass &a, const Klass &b) | |
73 | { return a.key < b.key; } | |
74 | friend bool operator> (const Klass &a, const Klass &b) | |
75 | { return a.key > b.key; } | |
76 | friend bool operator== (const Klass &a, const Klass &b) | |
77 | { return a.key == b.key; } | |
78 | void insert(unsigned cost, T& item, bool front) { | |
79 | if (front) { | |
80 | lp.push_front(*new ListPair(cost, item)); | |
81 | } else { | |
82 | lp.push_back(*new ListPair(cost, item)); | |
83 | } | |
84 | } | |
85 | //Get the cost of the next item to dequeue | |
86 | unsigned get_cost() const { | |
87 | assert(!empty()); | |
88 | return lp.begin()->cost; | |
89 | } | |
90 | T pop() { | |
91 | assert(!lp.empty()); | |
92 | T ret = lp.begin()->item; | |
93 | lp.erase_and_dispose(lp.begin(), DelItem<ListPair>()); | |
94 | return ret; | |
95 | } | |
96 | bool empty() const { | |
97 | return lp.empty(); | |
98 | } | |
99 | unsigned get_size() const { | |
100 | return lp.size(); | |
101 | } | |
7c673cae FG |
102 | unsigned filter_class(std::list<T>* out) { |
103 | unsigned count = 0; | |
104 | for (Lit i = --lp.end();; --i) { | |
105 | if (out) { | |
106 | out->push_front(i->item); | |
107 | } | |
108 | i = lp.erase_and_dispose(i, DelItem<ListPair>()); | |
109 | ++count; | |
110 | if (i == lp.begin()) { | |
111 | break; | |
112 | } | |
113 | } | |
114 | return count; | |
115 | } | |
116 | }; | |
117 | class SubQueue : public bi::set_base_hook<> | |
118 | { | |
119 | typedef bi::rbtree<Klass> Klasses; | |
120 | typedef typename Klasses::iterator Kit; | |
121 | void check_end() { | |
122 | if (next == klasses.end()) { | |
123 | next = klasses.begin(); | |
124 | } | |
125 | } | |
126 | public: | |
127 | unsigned key; // priority | |
128 | Klasses klasses; | |
129 | Kit next; | |
130 | SubQueue(unsigned& p) : | |
131 | key(p), | |
132 | next(klasses.begin()) | |
133 | {} | |
134 | friend bool operator< (const SubQueue &a, const SubQueue &b) | |
135 | { return a.key < b.key; } | |
136 | friend bool operator> (const SubQueue &a, const SubQueue &b) | |
137 | { return a.key > b.key; } | |
138 | friend bool operator== (const SubQueue &a, const SubQueue &b) | |
139 | { return a.key == b.key; } | |
140 | bool empty() const { | |
141 | return klasses.empty(); | |
142 | } | |
143 | void insert(K cl, unsigned cost, T& item, bool front = false) { | |
144 | typename Klasses::insert_commit_data insert_data; | |
145 | std::pair<Kit, bool> ret = | |
146 | klasses.insert_unique_check(cl, MapKey<Klass, K>(), insert_data); | |
147 | if (ret.second) { | |
148 | ret.first = klasses.insert_unique_commit(*new Klass(cl), insert_data); | |
149 | check_end(); | |
150 | } | |
151 | ret.first->insert(cost, item, front); | |
152 | } | |
153 | unsigned get_cost() const { | |
154 | assert(!empty()); | |
155 | return next->get_cost(); | |
156 | } | |
157 | T pop() { | |
158 | T ret = next->pop(); | |
159 | if (next->empty()) { | |
160 | next = klasses.erase_and_dispose(next, DelItem<Klass>()); | |
161 | } else { | |
162 | ++next; | |
163 | } | |
164 | check_end(); | |
165 | return ret; | |
166 | } | |
7c673cae FG |
167 | unsigned filter_class(K& cl, std::list<T>* out) { |
168 | unsigned count = 0; | |
169 | Kit i = klasses.find(cl, MapKey<Klass, K>()); | |
170 | if (i != klasses.end()) { | |
171 | count = i->filter_class(out); | |
172 | Kit tmp = klasses.erase_and_dispose(i, DelItem<Klass>()); | |
173 | if (next == i) { | |
174 | next = tmp; | |
175 | } | |
176 | check_end(); | |
177 | } | |
178 | return count; | |
179 | } | |
180 | void dump(ceph::Formatter *f) const { | |
181 | f->dump_int("num_keys", next->get_size()); | |
182 | if (!empty()) { | |
183 | f->dump_int("first_item_cost", next->get_cost()); | |
184 | } | |
185 | } | |
186 | }; | |
187 | class Queue { | |
188 | typedef bi::rbtree<SubQueue> SubQueues; | |
189 | typedef typename SubQueues::iterator Sit; | |
190 | SubQueues queues; | |
191 | unsigned total_prio; | |
192 | unsigned max_cost; | |
193 | public: | |
194 | unsigned size; | |
195 | Queue() : | |
196 | total_prio(0), | |
197 | max_cost(0), | |
198 | size(0) | |
199 | {} | |
200 | bool empty() const { | |
201 | return !size; | |
202 | } | |
203 | void insert(unsigned p, K cl, unsigned cost, T& item, bool front = false) { | |
204 | typename SubQueues::insert_commit_data insert_data; | |
205 | std::pair<typename SubQueues::iterator, bool> ret = | |
206 | queues.insert_unique_check(p, MapKey<SubQueue, unsigned>(), insert_data); | |
207 | if (ret.second) { | |
208 | ret.first = queues.insert_unique_commit(*new SubQueue(p), insert_data); | |
209 | total_prio += p; | |
210 | } | |
211 | ret.first->insert(cl, cost, item, front); | |
212 | if (cost > max_cost) { | |
213 | max_cost = cost; | |
214 | } | |
215 | ++size; | |
216 | } | |
217 | T pop(bool strict = false) { | |
218 | --size; | |
219 | Sit i = --queues.end(); | |
220 | if (strict) { | |
221 | T ret = i->pop(); | |
222 | if (i->empty()) { | |
223 | queues.erase_and_dispose(i, DelItem<SubQueue>()); | |
224 | } | |
225 | return ret; | |
226 | } | |
227 | if (queues.size() > 1) { | |
228 | while (true) { | |
229 | // Pick a new priority out of the total priority. | |
230 | unsigned prio = rand() % total_prio + 1; | |
231 | unsigned tp = total_prio - i->key; | |
232 | // Find the priority coresponding to the picked number. | |
233 | // Subtract high priorities to low priorities until the picked number | |
234 | // is more than the total and try to dequeue that priority. | |
235 | // Reverse the direction from previous implementation because there is a higher | |
236 | // chance of dequeuing a high priority op so spend less time spinning. | |
237 | while (prio <= tp) { | |
238 | --i; | |
239 | tp -= i->key; | |
240 | } | |
241 | // Flip a coin to see if this priority gets to run based on cost. | |
242 | // The next op's cost is multiplied by .9 and subtracted from the | |
243 | // max cost seen. Ops with lower costs will have a larger value | |
244 | // and allow them to be selected easier than ops with high costs. | |
245 | if (max_cost == 0 || rand() % max_cost <= | |
246 | (max_cost - ((i->get_cost() * 9) / 10))) { | |
247 | break; | |
248 | } | |
249 | i = --queues.end(); | |
250 | } | |
251 | } | |
252 | T ret = i->pop(); | |
253 | if (i->empty()) { | |
254 | total_prio -= i->key; | |
255 | queues.erase_and_dispose(i, DelItem<SubQueue>()); | |
256 | } | |
257 | return ret; | |
258 | } | |
7c673cae FG |
259 | void filter_class(K& cl, std::list<T>* out) { |
260 | for (Sit i = queues.begin(); i != queues.end();) { | |
261 | size -= i->filter_class(cl, out); | |
262 | if (i->empty()) { | |
263 | total_prio -= i->key; | |
264 | i = queues.erase_and_dispose(i, DelItem<SubQueue>()); | |
265 | } else { | |
266 | ++i; | |
267 | } | |
268 | } | |
269 | } | |
270 | void dump(ceph::Formatter *f) const { | |
271 | for (typename SubQueues::const_iterator i = queues.begin(); | |
272 | i != queues.end(); ++i) { | |
273 | f->dump_int("total_priority", total_prio); | |
274 | f->dump_int("max_cost", max_cost); | |
275 | f->open_object_section("subqueue"); | |
276 | f->dump_int("priority", i->key); | |
277 | i->dump(f); | |
278 | f->close_section(); | |
279 | } | |
280 | } | |
281 | }; | |
282 | ||
283 | Queue strict; | |
284 | Queue normal; | |
285 | public: | |
286 | WeightedPriorityQueue(unsigned max_per, unsigned min_c) : | |
287 | strict(), | |
288 | normal() | |
289 | { | |
290 | std::srand(time(0)); | |
291 | } | |
292 | unsigned length() const final { | |
293 | return strict.size + normal.size; | |
294 | } | |
7c673cae FG |
295 | void remove_by_class(K cl, std::list<T>* removed = 0) final { |
296 | strict.filter_class(cl, removed); | |
297 | normal.filter_class(cl, removed); | |
298 | } | |
299 | bool empty() const final { | |
300 | return !(strict.size + normal.size); | |
301 | } | |
302 | void enqueue_strict(K cl, unsigned p, T item) final { | |
303 | strict.insert(p, cl, 0, item); | |
304 | } | |
305 | void enqueue_strict_front(K cl, unsigned p, T item) final { | |
306 | strict.insert(p, cl, 0, item, true); | |
307 | } | |
308 | void enqueue(K cl, unsigned p, unsigned cost, T item) final { | |
309 | normal.insert(p, cl, cost, item); | |
310 | } | |
311 | void enqueue_front(K cl, unsigned p, unsigned cost, T item) final { | |
312 | normal.insert(p, cl, cost, item, true); | |
313 | } | |
314 | T dequeue() override { | |
315 | assert(strict.size + normal.size > 0); | |
316 | if (!strict.empty()) { | |
317 | return strict.pop(true); | |
318 | } | |
319 | return normal.pop(); | |
320 | } | |
321 | void dump(ceph::Formatter *f) const override { | |
322 | f->open_array_section("high_queues"); | |
323 | strict.dump(f); | |
324 | f->close_section(); | |
325 | f->open_array_section("queues"); | |
326 | normal.dump(f); | |
327 | f->close_section(); | |
328 | } | |
329 | }; | |
330 | ||
331 | #endif |