]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/PrioritizedQueue.h
8d9cd95b28e418d1675729e4e020da48cb53e61c
[ceph.git] / ceph / src / common / PrioritizedQueue.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef PRIORITY_QUEUE_H
16 #define PRIORITY_QUEUE_H
17
18 #include "common/Formatter.h"
19 #include "common/OpQueue.h"
20
21 /**
22 * Manages queue for normal and strict priority items
23 *
24 * On dequeue, the queue will select the lowest priority queue
25 * such that the q has bucket > cost of front queue item.
26 *
27 * If there is no such queue, we choose the next queue item for
28 * the highest priority queue.
29 *
30 * Before returning a dequeued item, we place into each bucket
31 * cost * (priority/total_priority) tokens.
32 *
33 * enqueue_strict and enqueue_strict_front queue items into queues
34 * which are serviced in strict priority order before items queued
35 * with enqueue and enqueue_front
36 *
37 * Within a priority class, we schedule round robin based on the class
38 * of type K used to enqueue items. e.g. you could use entity_inst_t
39 * to provide fairness for different clients.
40 */
41 template <typename T, typename K>
42 class PrioritizedQueue : public OpQueue <T, K> {
43 int64_t total_priority;
44 int64_t max_tokens_per_subqueue;
45 int64_t min_cost;
46
47 typedef std::list<std::pair<unsigned, T> > ListPairs;
48 static unsigned filter_list_pairs(
49 ListPairs *l,
50 std::function<bool (T)> f) {
51 unsigned ret = 0;
52 for (typename ListPairs::iterator i = l->end();
53 i != l->begin();
54 ) {
55 auto next = i;
56 --next;
57 if (f(next->second)) {
58 ++ret;
59 l->erase(next);
60 } else {
61 i = next;
62 }
63 }
64 return ret;
65 }
66
67 struct SubQueue {
68 private:
69 typedef std::map<K, ListPairs> Classes;
70 Classes q;
71 unsigned tokens, max_tokens;
72 int64_t size;
73 typename Classes::iterator cur;
74 public:
75 SubQueue(const SubQueue &other)
76 : q(other.q),
77 tokens(other.tokens),
78 max_tokens(other.max_tokens),
79 size(other.size),
80 cur(q.begin()) {}
81 SubQueue()
82 : tokens(0),
83 max_tokens(0),
84 size(0), cur(q.begin()) {}
85 void set_max_tokens(unsigned mt) {
86 max_tokens = mt;
87 }
88 unsigned get_max_tokens() const {
89 return max_tokens;
90 }
91 unsigned num_tokens() const {
92 return tokens;
93 }
94 void put_tokens(unsigned t) {
95 tokens += t;
96 if (tokens > max_tokens) {
97 tokens = max_tokens;
98 }
99 }
100 void take_tokens(unsigned t) {
101 if (tokens > t) {
102 tokens -= t;
103 } else {
104 tokens = 0;
105 }
106 }
107 void enqueue(K cl, unsigned cost, T item) {
108 q[cl].push_back(std::make_pair(cost, item));
109 if (cur == q.end())
110 cur = q.begin();
111 size++;
112 }
113 void enqueue_front(K cl, unsigned cost, T item) {
114 q[cl].push_front(std::make_pair(cost, item));
115 if (cur == q.end())
116 cur = q.begin();
117 size++;
118 }
119 std::pair<unsigned, T> front() const {
120 assert(!(q.empty()));
121 assert(cur != q.end());
122 return cur->second.front();
123 }
124 void pop_front() {
125 assert(!(q.empty()));
126 assert(cur != q.end());
127 cur->second.pop_front();
128 if (cur->second.empty()) {
129 q.erase(cur++);
130 } else {
131 ++cur;
132 }
133 if (cur == q.end()) {
134 cur = q.begin();
135 }
136 size--;
137 }
138 unsigned length() const {
139 assert(size >= 0);
140 return (unsigned)size;
141 }
142 bool empty() const {
143 return q.empty();
144 }
145 void remove_by_filter(
146 std::function<bool (T)> f) {
147 for (typename Classes::iterator i = q.begin();
148 i != q.end();
149 ) {
150 size -= filter_list_pairs(&(i->second), f);
151 if (i->second.empty()) {
152 if (cur == i) {
153 ++cur;
154 }
155 q.erase(i++);
156 } else {
157 ++i;
158 }
159 }
160 if (cur == q.end())
161 cur = q.begin();
162 }
163 void remove_by_class(K k, std::list<T> *out) {
164 typename Classes::iterator i = q.find(k);
165 if (i == q.end()) {
166 return;
167 }
168 size -= i->second.size();
169 if (i == cur) {
170 ++cur;
171 }
172 if (out) {
173 for (typename ListPairs::reverse_iterator j =
174 i->second.rbegin();
175 j != i->second.rend();
176 ++j) {
177 out->push_front(j->second);
178 }
179 }
180 q.erase(i);
181 if (cur == q.end()) {
182 cur = q.begin();
183 }
184 }
185
186 void dump(ceph::Formatter *f) const {
187 f->dump_int("tokens", tokens);
188 f->dump_int("max_tokens", max_tokens);
189 f->dump_int("size", size);
190 f->dump_int("num_keys", q.size());
191 if (!empty()) {
192 f->dump_int("first_item_cost", front().first);
193 }
194 }
195 };
196
197 typedef std::map<unsigned, SubQueue> SubQueues;
198 SubQueues high_queue;
199 SubQueues queue;
200
201 SubQueue *create_queue(unsigned priority) {
202 typename SubQueues::iterator p = queue.find(priority);
203 if (p != queue.end()) {
204 return &p->second;
205 }
206 total_priority += priority;
207 SubQueue *sq = &queue[priority];
208 sq->set_max_tokens(max_tokens_per_subqueue);
209 return sq;
210 }
211
212 void remove_queue(unsigned priority) {
213 assert(queue.count(priority));
214 queue.erase(priority);
215 total_priority -= priority;
216 assert(total_priority >= 0);
217 }
218
219 void distribute_tokens(unsigned cost) {
220 if (total_priority == 0) {
221 return;
222 }
223 for (typename SubQueues::iterator i = queue.begin();
224 i != queue.end();
225 ++i) {
226 i->second.put_tokens(((i->first * cost) / total_priority) + 1);
227 }
228 }
229
230 public:
231 PrioritizedQueue(unsigned max_per, unsigned min_c)
232 : total_priority(0),
233 max_tokens_per_subqueue(max_per),
234 min_cost(min_c)
235 {}
236
237 unsigned length() const final {
238 unsigned total = 0;
239 for (typename SubQueues::const_iterator i = queue.begin();
240 i != queue.end();
241 ++i) {
242 assert(i->second.length());
243 total += i->second.length();
244 }
245 for (typename SubQueues::const_iterator i = high_queue.begin();
246 i != high_queue.end();
247 ++i) {
248 assert(i->second.length());
249 total += i->second.length();
250 }
251 return total;
252 }
253
254 void remove_by_filter(
255 std::function<bool (T)> f) final {
256 for (typename SubQueues::iterator i = queue.begin();
257 i != queue.end();
258 ) {
259 unsigned priority = i->first;
260
261 i->second.remove_by_filter(f);
262 if (i->second.empty()) {
263 ++i;
264 remove_queue(priority);
265 } else {
266 ++i;
267 }
268 }
269 for (typename SubQueues::iterator i = high_queue.begin();
270 i != high_queue.end();
271 ) {
272 i->second.remove_by_filter(f);
273 if (i->second.empty()) {
274 high_queue.erase(i++);
275 } else {
276 ++i;
277 }
278 }
279 }
280
281 void remove_by_class(K k, std::list<T> *out = 0) final {
282 for (typename SubQueues::iterator i = queue.begin();
283 i != queue.end();
284 ) {
285 i->second.remove_by_class(k, out);
286 if (i->second.empty()) {
287 unsigned priority = i->first;
288 ++i;
289 remove_queue(priority);
290 } else {
291 ++i;
292 }
293 }
294 for (typename SubQueues::iterator i = high_queue.begin();
295 i != high_queue.end();
296 ) {
297 i->second.remove_by_class(k, out);
298 if (i->second.empty()) {
299 high_queue.erase(i++);
300 } else {
301 ++i;
302 }
303 }
304 }
305
306 void enqueue_strict(K cl, unsigned priority, T item) final {
307 high_queue[priority].enqueue(cl, 0, item);
308 }
309
310 void enqueue_strict_front(K cl, unsigned priority, T item) final {
311 high_queue[priority].enqueue_front(cl, 0, item);
312 }
313
314 void enqueue(K cl, unsigned priority, unsigned cost, T item) final {
315 if (cost < min_cost)
316 cost = min_cost;
317 if (cost > max_tokens_per_subqueue)
318 cost = max_tokens_per_subqueue;
319 create_queue(priority)->enqueue(cl, cost, item);
320 }
321
322 void enqueue_front(K cl, unsigned priority, unsigned cost, T item) final {
323 if (cost < min_cost)
324 cost = min_cost;
325 if (cost > max_tokens_per_subqueue)
326 cost = max_tokens_per_subqueue;
327 create_queue(priority)->enqueue_front(cl, cost, item);
328 }
329
330 bool empty() const final {
331 assert(total_priority >= 0);
332 assert((total_priority == 0) || !(queue.empty()));
333 return queue.empty() && high_queue.empty();
334 }
335
336 T dequeue() final {
337 assert(!empty());
338
339 if (!(high_queue.empty())) {
340 T ret = high_queue.rbegin()->second.front().second;
341 high_queue.rbegin()->second.pop_front();
342 if (high_queue.rbegin()->second.empty()) {
343 high_queue.erase(high_queue.rbegin()->first);
344 }
345 return ret;
346 }
347
348 // if there are multiple buckets/subqueues with sufficient tokens,
349 // we behave like a strict priority queue among all subqueues that
350 // are eligible to run.
351 for (typename SubQueues::iterator i = queue.begin();
352 i != queue.end();
353 ++i) {
354 assert(!(i->second.empty()));
355 if (i->second.front().first < i->second.num_tokens()) {
356 T ret = i->second.front().second;
357 unsigned cost = i->second.front().first;
358 i->second.take_tokens(cost);
359 i->second.pop_front();
360 if (i->second.empty()) {
361 remove_queue(i->first);
362 }
363 distribute_tokens(cost);
364 return ret;
365 }
366 }
367
368 // if no subqueues have sufficient tokens, we behave like a strict
369 // priority queue.
370 T ret = queue.rbegin()->second.front().second;
371 unsigned cost = queue.rbegin()->second.front().first;
372 queue.rbegin()->second.pop_front();
373 if (queue.rbegin()->second.empty()) {
374 remove_queue(queue.rbegin()->first);
375 }
376 distribute_tokens(cost);
377 return ret;
378 }
379
380 void dump(ceph::Formatter *f) const final {
381 f->dump_int("total_priority", total_priority);
382 f->dump_int("max_tokens_per_subqueue", max_tokens_per_subqueue);
383 f->dump_int("min_cost", min_cost);
384 f->open_array_section("high_queues");
385 for (typename SubQueues::const_iterator p = high_queue.begin();
386 p != high_queue.end();
387 ++p) {
388 f->open_object_section("subqueue");
389 f->dump_int("priority", p->first);
390 p->second.dump(f);
391 f->close_section();
392 }
393 f->close_section();
394 f->open_array_section("queues");
395 for (typename SubQueues::const_iterator p = queue.begin();
396 p != queue.end();
397 ++p) {
398 f->open_object_section("subqueue");
399 f->dump_int("priority", p->first);
400 p->second.dump(f);
401 f->close_section();
402 }
403 f->close_section();
404 }
405 };
406
407 #endif