]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/PrioritizedQueue.h
update sources to v12.1.1
[ceph.git] / ceph / src / common / PrioritizedQueue.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef PRIORITY_QUEUE_H
16 #define PRIORITY_QUEUE_H
17
18 #include "common/Formatter.h"
19 #include "common/OpQueue.h"
20
21 /**
22 * Manages queue for normal and strict priority items
23 *
24 * On dequeue, the queue will select the lowest priority queue
25 * such that the q has bucket > cost of front queue item.
26 *
27 * If there is no such queue, we choose the next queue item for
28 * the highest priority queue.
29 *
30 * Before returning a dequeued item, we place into each bucket
31 * cost * (priority/total_priority) tokens.
32 *
33 * enqueue_strict and enqueue_strict_front queue items into queues
34 * which are serviced in strict priority order before items queued
35 * with enqueue and enqueue_front
36 *
37 * Within a priority class, we schedule round robin based on the class
38 * of type K used to enqueue items. e.g. you could use entity_inst_t
39 * to provide fairness for different clients.
40 */
41 template <typename T, typename K>
42 class PrioritizedQueue : public OpQueue <T, K> {
43 int64_t total_priority;
44 int64_t max_tokens_per_subqueue;
45 int64_t min_cost;
46
47 typedef std::list<std::pair<unsigned, T> > ListPairs;
48
49 struct SubQueue {
50 private:
51 typedef std::map<K, ListPairs> Classes;
52 Classes q;
53 unsigned tokens, max_tokens;
54 int64_t size;
55 typename Classes::iterator cur;
56 public:
57 SubQueue(const SubQueue &other)
58 : q(other.q),
59 tokens(other.tokens),
60 max_tokens(other.max_tokens),
61 size(other.size),
62 cur(q.begin()) {}
63 SubQueue()
64 : tokens(0),
65 max_tokens(0),
66 size(0), cur(q.begin()) {}
67 void set_max_tokens(unsigned mt) {
68 max_tokens = mt;
69 }
70 unsigned get_max_tokens() const {
71 return max_tokens;
72 }
73 unsigned num_tokens() const {
74 return tokens;
75 }
76 void put_tokens(unsigned t) {
77 tokens += t;
78 if (tokens > max_tokens) {
79 tokens = max_tokens;
80 }
81 }
82 void take_tokens(unsigned t) {
83 if (tokens > t) {
84 tokens -= t;
85 } else {
86 tokens = 0;
87 }
88 }
89 void enqueue(K cl, unsigned cost, T item) {
90 q[cl].push_back(std::make_pair(cost, item));
91 if (cur == q.end())
92 cur = q.begin();
93 size++;
94 }
95 void enqueue_front(K cl, unsigned cost, T item) {
96 q[cl].push_front(std::make_pair(cost, item));
97 if (cur == q.end())
98 cur = q.begin();
99 size++;
100 }
101 std::pair<unsigned, T> front() const {
102 assert(!(q.empty()));
103 assert(cur != q.end());
104 return cur->second.front();
105 }
106 void pop_front() {
107 assert(!(q.empty()));
108 assert(cur != q.end());
109 cur->second.pop_front();
110 if (cur->second.empty()) {
111 q.erase(cur++);
112 } else {
113 ++cur;
114 }
115 if (cur == q.end()) {
116 cur = q.begin();
117 }
118 size--;
119 }
120 unsigned length() const {
121 assert(size >= 0);
122 return (unsigned)size;
123 }
124 bool empty() const {
125 return q.empty();
126 }
127 void remove_by_class(K k, std::list<T> *out) {
128 typename Classes::iterator i = q.find(k);
129 if (i == q.end()) {
130 return;
131 }
132 size -= i->second.size();
133 if (i == cur) {
134 ++cur;
135 }
136 if (out) {
137 for (typename ListPairs::reverse_iterator j =
138 i->second.rbegin();
139 j != i->second.rend();
140 ++j) {
141 out->push_front(j->second);
142 }
143 }
144 q.erase(i);
145 if (cur == q.end()) {
146 cur = q.begin();
147 }
148 }
149
150 void dump(ceph::Formatter *f) const {
151 f->dump_int("tokens", tokens);
152 f->dump_int("max_tokens", max_tokens);
153 f->dump_int("size", size);
154 f->dump_int("num_keys", q.size());
155 if (!empty()) {
156 f->dump_int("first_item_cost", front().first);
157 }
158 }
159 };
160
161 typedef std::map<unsigned, SubQueue> SubQueues;
162 SubQueues high_queue;
163 SubQueues queue;
164
165 SubQueue *create_queue(unsigned priority) {
166 typename SubQueues::iterator p = queue.find(priority);
167 if (p != queue.end()) {
168 return &p->second;
169 }
170 total_priority += priority;
171 SubQueue *sq = &queue[priority];
172 sq->set_max_tokens(max_tokens_per_subqueue);
173 return sq;
174 }
175
176 void remove_queue(unsigned priority) {
177 assert(queue.count(priority));
178 queue.erase(priority);
179 total_priority -= priority;
180 assert(total_priority >= 0);
181 }
182
183 void distribute_tokens(unsigned cost) {
184 if (total_priority == 0) {
185 return;
186 }
187 for (typename SubQueues::iterator i = queue.begin();
188 i != queue.end();
189 ++i) {
190 i->second.put_tokens(((i->first * cost) / total_priority) + 1);
191 }
192 }
193
194 public:
195 PrioritizedQueue(unsigned max_per, unsigned min_c)
196 : total_priority(0),
197 max_tokens_per_subqueue(max_per),
198 min_cost(min_c)
199 {}
200
201 unsigned length() const final {
202 unsigned total = 0;
203 for (typename SubQueues::const_iterator i = queue.begin();
204 i != queue.end();
205 ++i) {
206 assert(i->second.length());
207 total += i->second.length();
208 }
209 for (typename SubQueues::const_iterator i = high_queue.begin();
210 i != high_queue.end();
211 ++i) {
212 assert(i->second.length());
213 total += i->second.length();
214 }
215 return total;
216 }
217
218 void remove_by_class(K k, std::list<T> *out = 0) final {
219 for (typename SubQueues::iterator i = queue.begin();
220 i != queue.end();
221 ) {
222 i->second.remove_by_class(k, out);
223 if (i->second.empty()) {
224 unsigned priority = i->first;
225 ++i;
226 remove_queue(priority);
227 } else {
228 ++i;
229 }
230 }
231 for (typename SubQueues::iterator i = high_queue.begin();
232 i != high_queue.end();
233 ) {
234 i->second.remove_by_class(k, out);
235 if (i->second.empty()) {
236 high_queue.erase(i++);
237 } else {
238 ++i;
239 }
240 }
241 }
242
243 void enqueue_strict(K cl, unsigned priority, T item) final {
244 high_queue[priority].enqueue(cl, 0, item);
245 }
246
247 void enqueue_strict_front(K cl, unsigned priority, T item) final {
248 high_queue[priority].enqueue_front(cl, 0, item);
249 }
250
251 void enqueue(K cl, unsigned priority, unsigned cost, T item) final {
252 if (cost < min_cost)
253 cost = min_cost;
254 if (cost > max_tokens_per_subqueue)
255 cost = max_tokens_per_subqueue;
256 create_queue(priority)->enqueue(cl, cost, item);
257 }
258
259 void enqueue_front(K cl, unsigned priority, unsigned cost, T item) final {
260 if (cost < min_cost)
261 cost = min_cost;
262 if (cost > max_tokens_per_subqueue)
263 cost = max_tokens_per_subqueue;
264 create_queue(priority)->enqueue_front(cl, cost, item);
265 }
266
267 bool empty() const final {
268 assert(total_priority >= 0);
269 assert((total_priority == 0) || !(queue.empty()));
270 return queue.empty() && high_queue.empty();
271 }
272
273 T dequeue() final {
274 assert(!empty());
275
276 if (!(high_queue.empty())) {
277 T ret = high_queue.rbegin()->second.front().second;
278 high_queue.rbegin()->second.pop_front();
279 if (high_queue.rbegin()->second.empty()) {
280 high_queue.erase(high_queue.rbegin()->first);
281 }
282 return ret;
283 }
284
285 // if there are multiple buckets/subqueues with sufficient tokens,
286 // we behave like a strict priority queue among all subqueues that
287 // are eligible to run.
288 for (typename SubQueues::iterator i = queue.begin();
289 i != queue.end();
290 ++i) {
291 assert(!(i->second.empty()));
292 if (i->second.front().first < i->second.num_tokens()) {
293 T ret = i->second.front().second;
294 unsigned cost = i->second.front().first;
295 i->second.take_tokens(cost);
296 i->second.pop_front();
297 if (i->second.empty()) {
298 remove_queue(i->first);
299 }
300 distribute_tokens(cost);
301 return ret;
302 }
303 }
304
305 // if no subqueues have sufficient tokens, we behave like a strict
306 // priority queue.
307 T ret = queue.rbegin()->second.front().second;
308 unsigned cost = queue.rbegin()->second.front().first;
309 queue.rbegin()->second.pop_front();
310 if (queue.rbegin()->second.empty()) {
311 remove_queue(queue.rbegin()->first);
312 }
313 distribute_tokens(cost);
314 return ret;
315 }
316
317 void dump(ceph::Formatter *f) const final {
318 f->dump_int("total_priority", total_priority);
319 f->dump_int("max_tokens_per_subqueue", max_tokens_per_subqueue);
320 f->dump_int("min_cost", min_cost);
321 f->open_array_section("high_queues");
322 for (typename SubQueues::const_iterator p = high_queue.begin();
323 p != high_queue.end();
324 ++p) {
325 f->open_object_section("subqueue");
326 f->dump_int("priority", p->first);
327 p->second.dump(f);
328 f->close_section();
329 }
330 f->close_section();
331 f->open_array_section("queues");
332 for (typename SubQueues::const_iterator p = queue.begin();
333 p != queue.end();
334 ++p) {
335 f->open_object_section("subqueue");
336 f->dump_int("priority", p->first);
337 p->second.dump(f);
338 f->close_section();
339 }
340 f->close_section();
341 }
342 };
343
344 #endif