]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/PrioritizedQueue.h
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / common / PrioritizedQueue.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef PRIORITY_QUEUE_H
16#define PRIORITY_QUEUE_H
17
11fdf7f2
TL
18#include "include/ceph_assert.h"
19
7c673cae
FG
20#include "common/Formatter.h"
21#include "common/OpQueue.h"
22
7c673cae
FG
23/**
24 * Manages queue for normal and strict priority items
25 *
26 * On dequeue, the queue will select the lowest priority queue
27 * such that the q has bucket > cost of front queue item.
28 *
29 * If there is no such queue, we choose the next queue item for
30 * the highest priority queue.
31 *
32 * Before returning a dequeued item, we place into each bucket
33 * cost * (priority/total_priority) tokens.
34 *
35 * enqueue_strict and enqueue_strict_front queue items into queues
36 * which are serviced in strict priority order before items queued
37 * with enqueue and enqueue_front
38 *
39 * Within a priority class, we schedule round robin based on the class
40 * of type K used to enqueue items. e.g. you could use entity_inst_t
41 * to provide fairness for different clients.
42 */
43template <typename T, typename K>
44class PrioritizedQueue : public OpQueue <T, K> {
45 int64_t total_priority;
46 int64_t max_tokens_per_subqueue;
47 int64_t min_cost;
48
49 typedef std::list<std::pair<unsigned, T> > ListPairs;
7c673cae
FG
50
51 struct SubQueue {
52 private:
53 typedef std::map<K, ListPairs> Classes;
54 Classes q;
55 unsigned tokens, max_tokens;
56 int64_t size;
57 typename Classes::iterator cur;
58 public:
59 SubQueue(const SubQueue &other)
60 : q(other.q),
61 tokens(other.tokens),
62 max_tokens(other.max_tokens),
63 size(other.size),
64 cur(q.begin()) {}
65 SubQueue()
66 : tokens(0),
67 max_tokens(0),
68 size(0), cur(q.begin()) {}
69 void set_max_tokens(unsigned mt) {
70 max_tokens = mt;
71 }
72 unsigned get_max_tokens() const {
73 return max_tokens;
74 }
75 unsigned num_tokens() const {
76 return tokens;
77 }
78 void put_tokens(unsigned t) {
79 tokens += t;
80 if (tokens > max_tokens) {
81 tokens = max_tokens;
82 }
83 }
84 void take_tokens(unsigned t) {
85 if (tokens > t) {
86 tokens -= t;
87 } else {
88 tokens = 0;
89 }
90 }
11fdf7f2
TL
91 void enqueue(K cl, unsigned cost, T &&item) {
92 q[cl].push_back(std::make_pair(cost, std::move(item)));
7c673cae
FG
93 if (cur == q.end())
94 cur = q.begin();
95 size++;
96 }
11fdf7f2
TL
97 void enqueue_front(K cl, unsigned cost, T &&item) {
98 q[cl].push_front(std::make_pair(cost, std::move(item)));
7c673cae
FG
99 if (cur == q.end())
100 cur = q.begin();
101 size++;
102 }
11fdf7f2
TL
103 std::pair<unsigned, T> &front() const {
104 ceph_assert(!(q.empty()));
105 ceph_assert(cur != q.end());
7c673cae
FG
106 return cur->second.front();
107 }
11fdf7f2
TL
108 T pop_front() {
109 ceph_assert(!(q.empty()));
110 ceph_assert(cur != q.end());
111 T ret = std::move(cur->second.front().second);
7c673cae
FG
112 cur->second.pop_front();
113 if (cur->second.empty()) {
114 q.erase(cur++);
115 } else {
116 ++cur;
117 }
118 if (cur == q.end()) {
119 cur = q.begin();
120 }
121 size--;
11fdf7f2 122 return ret;
7c673cae
FG
123 }
124 unsigned length() const {
11fdf7f2 125 ceph_assert(size >= 0);
7c673cae
FG
126 return (unsigned)size;
127 }
128 bool empty() const {
129 return q.empty();
130 }
7c673cae
FG
131 void remove_by_class(K k, std::list<T> *out) {
132 typename Classes::iterator i = q.find(k);
133 if (i == q.end()) {
134 return;
135 }
136 size -= i->second.size();
137 if (i == cur) {
138 ++cur;
139 }
140 if (out) {
141 for (typename ListPairs::reverse_iterator j =
142 i->second.rbegin();
143 j != i->second.rend();
144 ++j) {
11fdf7f2 145 out->push_front(std::move(j->second));
7c673cae
FG
146 }
147 }
148 q.erase(i);
149 if (cur == q.end()) {
150 cur = q.begin();
151 }
152 }
153
154 void dump(ceph::Formatter *f) const {
155 f->dump_int("tokens", tokens);
156 f->dump_int("max_tokens", max_tokens);
157 f->dump_int("size", size);
158 f->dump_int("num_keys", q.size());
159 if (!empty()) {
160 f->dump_int("first_item_cost", front().first);
161 }
162 }
163 };
164
165 typedef std::map<unsigned, SubQueue> SubQueues;
166 SubQueues high_queue;
167 SubQueues queue;
168
169 SubQueue *create_queue(unsigned priority) {
170 typename SubQueues::iterator p = queue.find(priority);
171 if (p != queue.end()) {
172 return &p->second;
173 }
174 total_priority += priority;
175 SubQueue *sq = &queue[priority];
176 sq->set_max_tokens(max_tokens_per_subqueue);
177 return sq;
178 }
179
180 void remove_queue(unsigned priority) {
11fdf7f2 181 ceph_assert(queue.count(priority));
7c673cae
FG
182 queue.erase(priority);
183 total_priority -= priority;
11fdf7f2 184 ceph_assert(total_priority >= 0);
7c673cae
FG
185 }
186
187 void distribute_tokens(unsigned cost) {
188 if (total_priority == 0) {
189 return;
190 }
191 for (typename SubQueues::iterator i = queue.begin();
192 i != queue.end();
193 ++i) {
194 i->second.put_tokens(((i->first * cost) / total_priority) + 1);
195 }
196 }
197
198public:
199 PrioritizedQueue(unsigned max_per, unsigned min_c)
200 : total_priority(0),
201 max_tokens_per_subqueue(max_per),
202 min_cost(min_c)
203 {}
204
11fdf7f2 205 unsigned length() const {
7c673cae
FG
206 unsigned total = 0;
207 for (typename SubQueues::const_iterator i = queue.begin();
208 i != queue.end();
209 ++i) {
11fdf7f2 210 ceph_assert(i->second.length());
7c673cae
FG
211 total += i->second.length();
212 }
213 for (typename SubQueues::const_iterator i = high_queue.begin();
214 i != high_queue.end();
215 ++i) {
11fdf7f2 216 ceph_assert(i->second.length());
7c673cae
FG
217 total += i->second.length();
218 }
219 return total;
220 }
221
7c673cae
FG
222 void remove_by_class(K k, std::list<T> *out = 0) final {
223 for (typename SubQueues::iterator i = queue.begin();
224 i != queue.end();
225 ) {
226 i->second.remove_by_class(k, out);
227 if (i->second.empty()) {
228 unsigned priority = i->first;
229 ++i;
230 remove_queue(priority);
231 } else {
232 ++i;
233 }
234 }
235 for (typename SubQueues::iterator i = high_queue.begin();
236 i != high_queue.end();
237 ) {
238 i->second.remove_by_class(k, out);
239 if (i->second.empty()) {
240 high_queue.erase(i++);
241 } else {
242 ++i;
243 }
244 }
245 }
246
11fdf7f2
TL
247 void enqueue_strict(K cl, unsigned priority, T&& item) final {
248 high_queue[priority].enqueue(cl, 0, std::move(item));
7c673cae
FG
249 }
250
11fdf7f2
TL
251 void enqueue_strict_front(K cl, unsigned priority, T&& item) final {
252 high_queue[priority].enqueue_front(cl, 0, std::move(item));
7c673cae
FG
253 }
254
11fdf7f2 255 void enqueue(K cl, unsigned priority, unsigned cost, T&& item) final {
7c673cae
FG
256 if (cost < min_cost)
257 cost = min_cost;
258 if (cost > max_tokens_per_subqueue)
259 cost = max_tokens_per_subqueue;
11fdf7f2 260 create_queue(priority)->enqueue(cl, cost, std::move(item));
7c673cae
FG
261 }
262
11fdf7f2 263 void enqueue_front(K cl, unsigned priority, unsigned cost, T&& item) final {
7c673cae
FG
264 if (cost < min_cost)
265 cost = min_cost;
266 if (cost > max_tokens_per_subqueue)
267 cost = max_tokens_per_subqueue;
11fdf7f2 268 create_queue(priority)->enqueue_front(cl, cost, std::move(item));
7c673cae
FG
269 }
270
271 bool empty() const final {
11fdf7f2
TL
272 ceph_assert(total_priority >= 0);
273 ceph_assert((total_priority == 0) || !(queue.empty()));
7c673cae
FG
274 return queue.empty() && high_queue.empty();
275 }
276
277 T dequeue() final {
11fdf7f2 278 ceph_assert(!empty());
7c673cae
FG
279
280 if (!(high_queue.empty())) {
11fdf7f2 281 T ret = std::move(high_queue.rbegin()->second.front().second);
7c673cae
FG
282 high_queue.rbegin()->second.pop_front();
283 if (high_queue.rbegin()->second.empty()) {
284 high_queue.erase(high_queue.rbegin()->first);
285 }
286 return ret;
287 }
288
289 // if there are multiple buckets/subqueues with sufficient tokens,
290 // we behave like a strict priority queue among all subqueues that
291 // are eligible to run.
292 for (typename SubQueues::iterator i = queue.begin();
293 i != queue.end();
294 ++i) {
11fdf7f2 295 ceph_assert(!(i->second.empty()));
7c673cae 296 if (i->second.front().first < i->second.num_tokens()) {
7c673cae
FG
297 unsigned cost = i->second.front().first;
298 i->second.take_tokens(cost);
11fdf7f2 299 T ret = std::move(i->second.front().second);
7c673cae
FG
300 i->second.pop_front();
301 if (i->second.empty()) {
302 remove_queue(i->first);
303 }
304 distribute_tokens(cost);
305 return ret;
306 }
307 }
308
309 // if no subqueues have sufficient tokens, we behave like a strict
310 // priority queue.
7c673cae 311 unsigned cost = queue.rbegin()->second.front().first;
11fdf7f2 312 T ret = std::move(queue.rbegin()->second.front().second);
7c673cae
FG
313 queue.rbegin()->second.pop_front();
314 if (queue.rbegin()->second.empty()) {
315 remove_queue(queue.rbegin()->first);
316 }
317 distribute_tokens(cost);
318 return ret;
319 }
320
321 void dump(ceph::Formatter *f) const final {
322 f->dump_int("total_priority", total_priority);
323 f->dump_int("max_tokens_per_subqueue", max_tokens_per_subqueue);
324 f->dump_int("min_cost", min_cost);
325 f->open_array_section("high_queues");
326 for (typename SubQueues::const_iterator p = high_queue.begin();
327 p != high_queue.end();
328 ++p) {
329 f->open_object_section("subqueue");
330 f->dump_int("priority", p->first);
331 p->second.dump(f);
332 f->close_section();
333 }
334 f->close_section();
335 f->open_array_section("queues");
336 for (typename SubQueues::const_iterator p = queue.begin();
337 p != queue.end();
338 ++p) {
339 f->open_object_section("subqueue");
340 f->dump_int("priority", p->first);
341 p->second.dump(f);
342 f->close_section();
343 }
344 f->close_section();
345 }
9f95a23c
TL
346
347 void print(std::ostream &ostream) const final {
348 ostream << "PrioritizedQueue";
349 }
7c673cae
FG
350};
351
352#endif