]>
git.proxmox.com Git - ceph.git/blob - ceph/src/common/PrioritizedQueue.h
8d9cd95b28e418d1675729e4e020da48cb53e61c
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef PRIORITY_QUEUE_H
16 #define PRIORITY_QUEUE_H
18 #include "common/Formatter.h"
19 #include "common/OpQueue.h"
22 * Manages queue for normal and strict priority items
24 * On dequeue, the queue will select the lowest priority queue
25 * such that the q has bucket > cost of front queue item.
27 * If there is no such queue, we choose the next queue item for
28 * the highest priority queue.
30 * Before returning a dequeued item, we place into each bucket
31 * cost * (priority/total_priority) tokens.
33 * enqueue_strict and enqueue_strict_front queue items into queues
34 * which are serviced in strict priority order before items queued
35 * with enqueue and enqueue_front
37 * Within a priority class, we schedule round robin based on the class
38 * of type K used to enqueue items. e.g. you could use entity_inst_t
39 * to provide fairness for different clients.
41 template <typename T
, typename K
>
42 class PrioritizedQueue
: public OpQueue
<T
, K
> {
43 int64_t total_priority
;
44 int64_t max_tokens_per_subqueue
;
47 typedef std::list
<std::pair
<unsigned, T
> > ListPairs
;
48 static unsigned filter_list_pairs(
50 std::function
<bool (T
)> f
) {
52 for (typename
ListPairs::iterator i
= l
->end();
57 if (f(next
->second
)) {
69 typedef std::map
<K
, ListPairs
> Classes
;
71 unsigned tokens
, max_tokens
;
73 typename
Classes::iterator cur
;
75 SubQueue(const SubQueue
&other
)
78 max_tokens(other
.max_tokens
),
84 size(0), cur(q
.begin()) {}
85 void set_max_tokens(unsigned mt
) {
88 unsigned get_max_tokens() const {
91 unsigned num_tokens() const {
94 void put_tokens(unsigned t
) {
96 if (tokens
> max_tokens
) {
100 void take_tokens(unsigned t
) {
107 void enqueue(K cl
, unsigned cost
, T item
) {
108 q
[cl
].push_back(std::make_pair(cost
, item
));
113 void enqueue_front(K cl
, unsigned cost
, T item
) {
114 q
[cl
].push_front(std::make_pair(cost
, item
));
119 std::pair
<unsigned, T
> front() const {
120 assert(!(q
.empty()));
121 assert(cur
!= q
.end());
122 return cur
->second
.front();
125 assert(!(q
.empty()));
126 assert(cur
!= q
.end());
127 cur
->second
.pop_front();
128 if (cur
->second
.empty()) {
133 if (cur
== q
.end()) {
138 unsigned length() const {
140 return (unsigned)size
;
145 void remove_by_filter(
146 std::function
<bool (T
)> f
) {
147 for (typename
Classes::iterator i
= q
.begin();
150 size
-= filter_list_pairs(&(i
->second
), f
);
151 if (i
->second
.empty()) {
163 void remove_by_class(K k
, std::list
<T
> *out
) {
164 typename
Classes::iterator i
= q
.find(k
);
168 size
-= i
->second
.size();
173 for (typename
ListPairs::reverse_iterator j
=
175 j
!= i
->second
.rend();
177 out
->push_front(j
->second
);
181 if (cur
== q
.end()) {
186 void dump(ceph::Formatter
*f
) const {
187 f
->dump_int("tokens", tokens
);
188 f
->dump_int("max_tokens", max_tokens
);
189 f
->dump_int("size", size
);
190 f
->dump_int("num_keys", q
.size());
192 f
->dump_int("first_item_cost", front().first
);
197 typedef std::map
<unsigned, SubQueue
> SubQueues
;
198 SubQueues high_queue
;
201 SubQueue
*create_queue(unsigned priority
) {
202 typename
SubQueues::iterator p
= queue
.find(priority
);
203 if (p
!= queue
.end()) {
206 total_priority
+= priority
;
207 SubQueue
*sq
= &queue
[priority
];
208 sq
->set_max_tokens(max_tokens_per_subqueue
);
212 void remove_queue(unsigned priority
) {
213 assert(queue
.count(priority
));
214 queue
.erase(priority
);
215 total_priority
-= priority
;
216 assert(total_priority
>= 0);
219 void distribute_tokens(unsigned cost
) {
220 if (total_priority
== 0) {
223 for (typename
SubQueues::iterator i
= queue
.begin();
226 i
->second
.put_tokens(((i
->first
* cost
) / total_priority
) + 1);
231 PrioritizedQueue(unsigned max_per
, unsigned min_c
)
233 max_tokens_per_subqueue(max_per
),
237 unsigned length() const final
{
239 for (typename
SubQueues::const_iterator i
= queue
.begin();
242 assert(i
->second
.length());
243 total
+= i
->second
.length();
245 for (typename
SubQueues::const_iterator i
= high_queue
.begin();
246 i
!= high_queue
.end();
248 assert(i
->second
.length());
249 total
+= i
->second
.length();
254 void remove_by_filter(
255 std::function
<bool (T
)> f
) final
{
256 for (typename
SubQueues::iterator i
= queue
.begin();
259 unsigned priority
= i
->first
;
261 i
->second
.remove_by_filter(f
);
262 if (i
->second
.empty()) {
264 remove_queue(priority
);
269 for (typename
SubQueues::iterator i
= high_queue
.begin();
270 i
!= high_queue
.end();
272 i
->second
.remove_by_filter(f
);
273 if (i
->second
.empty()) {
274 high_queue
.erase(i
++);
281 void remove_by_class(K k
, std::list
<T
> *out
= 0) final
{
282 for (typename
SubQueues::iterator i
= queue
.begin();
285 i
->second
.remove_by_class(k
, out
);
286 if (i
->second
.empty()) {
287 unsigned priority
= i
->first
;
289 remove_queue(priority
);
294 for (typename
SubQueues::iterator i
= high_queue
.begin();
295 i
!= high_queue
.end();
297 i
->second
.remove_by_class(k
, out
);
298 if (i
->second
.empty()) {
299 high_queue
.erase(i
++);
306 void enqueue_strict(K cl
, unsigned priority
, T item
) final
{
307 high_queue
[priority
].enqueue(cl
, 0, item
);
310 void enqueue_strict_front(K cl
, unsigned priority
, T item
) final
{
311 high_queue
[priority
].enqueue_front(cl
, 0, item
);
314 void enqueue(K cl
, unsigned priority
, unsigned cost
, T item
) final
{
317 if (cost
> max_tokens_per_subqueue
)
318 cost
= max_tokens_per_subqueue
;
319 create_queue(priority
)->enqueue(cl
, cost
, item
);
322 void enqueue_front(K cl
, unsigned priority
, unsigned cost
, T item
) final
{
325 if (cost
> max_tokens_per_subqueue
)
326 cost
= max_tokens_per_subqueue
;
327 create_queue(priority
)->enqueue_front(cl
, cost
, item
);
330 bool empty() const final
{
331 assert(total_priority
>= 0);
332 assert((total_priority
== 0) || !(queue
.empty()));
333 return queue
.empty() && high_queue
.empty();
339 if (!(high_queue
.empty())) {
340 T ret
= high_queue
.rbegin()->second
.front().second
;
341 high_queue
.rbegin()->second
.pop_front();
342 if (high_queue
.rbegin()->second
.empty()) {
343 high_queue
.erase(high_queue
.rbegin()->first
);
348 // if there are multiple buckets/subqueues with sufficient tokens,
349 // we behave like a strict priority queue among all subqueues that
350 // are eligible to run.
351 for (typename
SubQueues::iterator i
= queue
.begin();
354 assert(!(i
->second
.empty()));
355 if (i
->second
.front().first
< i
->second
.num_tokens()) {
356 T ret
= i
->second
.front().second
;
357 unsigned cost
= i
->second
.front().first
;
358 i
->second
.take_tokens(cost
);
359 i
->second
.pop_front();
360 if (i
->second
.empty()) {
361 remove_queue(i
->first
);
363 distribute_tokens(cost
);
368 // if no subqueues have sufficient tokens, we behave like a strict
370 T ret
= queue
.rbegin()->second
.front().second
;
371 unsigned cost
= queue
.rbegin()->second
.front().first
;
372 queue
.rbegin()->second
.pop_front();
373 if (queue
.rbegin()->second
.empty()) {
374 remove_queue(queue
.rbegin()->first
);
376 distribute_tokens(cost
);
380 void dump(ceph::Formatter
*f
) const final
{
381 f
->dump_int("total_priority", total_priority
);
382 f
->dump_int("max_tokens_per_subqueue", max_tokens_per_subqueue
);
383 f
->dump_int("min_cost", min_cost
);
384 f
->open_array_section("high_queues");
385 for (typename
SubQueues::const_iterator p
= high_queue
.begin();
386 p
!= high_queue
.end();
388 f
->open_object_section("subqueue");
389 f
->dump_int("priority", p
->first
);
394 f
->open_array_section("queues");
395 for (typename
SubQueues::const_iterator p
= queue
.begin();
398 f
->open_object_section("subqueue");
399 f
->dump_int("priority", p
->first
);