]>
git.proxmox.com Git - ceph.git/blob - ceph/src/common/PrioritizedQueue.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef PRIORITY_QUEUE_H
16 #define PRIORITY_QUEUE_H
18 #include "common/Formatter.h"
19 #include "common/OpQueue.h"
22 * Manages queue for normal and strict priority items
24 * On dequeue, the queue will select the lowest priority queue
25 * such that the q has bucket > cost of front queue item.
27 * If there is no such queue, we choose the next queue item for
28 * the highest priority queue.
30 * Before returning a dequeued item, we place into each bucket
31 * cost * (priority/total_priority) tokens.
33 * enqueue_strict and enqueue_strict_front queue items into queues
34 * which are serviced in strict priority order before items queued
35 * with enqueue and enqueue_front
37 * Within a priority class, we schedule round robin based on the class
38 * of type K used to enqueue items. e.g. you could use entity_inst_t
39 * to provide fairness for different clients.
41 template <typename T
, typename K
>
42 class PrioritizedQueue
: public OpQueue
<T
, K
> {
43 int64_t total_priority
;
44 int64_t max_tokens_per_subqueue
;
47 typedef std::list
<std::pair
<unsigned, T
> > ListPairs
;
51 typedef std::map
<K
, ListPairs
> Classes
;
53 unsigned tokens
, max_tokens
;
55 typename
Classes::iterator cur
;
57 SubQueue(const SubQueue
&other
)
60 max_tokens(other
.max_tokens
),
66 size(0), cur(q
.begin()) {}
67 void set_max_tokens(unsigned mt
) {
70 unsigned get_max_tokens() const {
73 unsigned num_tokens() const {
76 void put_tokens(unsigned t
) {
78 if (tokens
> max_tokens
) {
82 void take_tokens(unsigned t
) {
89 void enqueue(K cl
, unsigned cost
, T item
) {
90 q
[cl
].push_back(std::make_pair(cost
, item
));
95 void enqueue_front(K cl
, unsigned cost
, T item
) {
96 q
[cl
].push_front(std::make_pair(cost
, item
));
101 std::pair
<unsigned, T
> front() const {
102 assert(!(q
.empty()));
103 assert(cur
!= q
.end());
104 return cur
->second
.front();
107 assert(!(q
.empty()));
108 assert(cur
!= q
.end());
109 cur
->second
.pop_front();
110 if (cur
->second
.empty()) {
115 if (cur
== q
.end()) {
120 unsigned length() const {
122 return (unsigned)size
;
127 void remove_by_class(K k
, std::list
<T
> *out
) {
128 typename
Classes::iterator i
= q
.find(k
);
132 size
-= i
->second
.size();
137 for (typename
ListPairs::reverse_iterator j
=
139 j
!= i
->second
.rend();
141 out
->push_front(j
->second
);
145 if (cur
== q
.end()) {
150 void dump(ceph::Formatter
*f
) const {
151 f
->dump_int("tokens", tokens
);
152 f
->dump_int("max_tokens", max_tokens
);
153 f
->dump_int("size", size
);
154 f
->dump_int("num_keys", q
.size());
156 f
->dump_int("first_item_cost", front().first
);
161 typedef std::map
<unsigned, SubQueue
> SubQueues
;
162 SubQueues high_queue
;
165 SubQueue
*create_queue(unsigned priority
) {
166 typename
SubQueues::iterator p
= queue
.find(priority
);
167 if (p
!= queue
.end()) {
170 total_priority
+= priority
;
171 SubQueue
*sq
= &queue
[priority
];
172 sq
->set_max_tokens(max_tokens_per_subqueue
);
176 void remove_queue(unsigned priority
) {
177 assert(queue
.count(priority
));
178 queue
.erase(priority
);
179 total_priority
-= priority
;
180 assert(total_priority
>= 0);
183 void distribute_tokens(unsigned cost
) {
184 if (total_priority
== 0) {
187 for (typename
SubQueues::iterator i
= queue
.begin();
190 i
->second
.put_tokens(((i
->first
* cost
) / total_priority
) + 1);
195 PrioritizedQueue(unsigned max_per
, unsigned min_c
)
197 max_tokens_per_subqueue(max_per
),
201 unsigned length() const final
{
203 for (typename
SubQueues::const_iterator i
= queue
.begin();
206 assert(i
->second
.length());
207 total
+= i
->second
.length();
209 for (typename
SubQueues::const_iterator i
= high_queue
.begin();
210 i
!= high_queue
.end();
212 assert(i
->second
.length());
213 total
+= i
->second
.length();
218 void remove_by_class(K k
, std::list
<T
> *out
= 0) final
{
219 for (typename
SubQueues::iterator i
= queue
.begin();
222 i
->second
.remove_by_class(k
, out
);
223 if (i
->second
.empty()) {
224 unsigned priority
= i
->first
;
226 remove_queue(priority
);
231 for (typename
SubQueues::iterator i
= high_queue
.begin();
232 i
!= high_queue
.end();
234 i
->second
.remove_by_class(k
, out
);
235 if (i
->second
.empty()) {
236 high_queue
.erase(i
++);
243 void enqueue_strict(K cl
, unsigned priority
, T item
) final
{
244 high_queue
[priority
].enqueue(cl
, 0, item
);
247 void enqueue_strict_front(K cl
, unsigned priority
, T item
) final
{
248 high_queue
[priority
].enqueue_front(cl
, 0, item
);
251 void enqueue(K cl
, unsigned priority
, unsigned cost
, T item
) final
{
254 if (cost
> max_tokens_per_subqueue
)
255 cost
= max_tokens_per_subqueue
;
256 create_queue(priority
)->enqueue(cl
, cost
, item
);
259 void enqueue_front(K cl
, unsigned priority
, unsigned cost
, T item
) final
{
262 if (cost
> max_tokens_per_subqueue
)
263 cost
= max_tokens_per_subqueue
;
264 create_queue(priority
)->enqueue_front(cl
, cost
, item
);
267 bool empty() const final
{
268 assert(total_priority
>= 0);
269 assert((total_priority
== 0) || !(queue
.empty()));
270 return queue
.empty() && high_queue
.empty();
276 if (!(high_queue
.empty())) {
277 T ret
= high_queue
.rbegin()->second
.front().second
;
278 high_queue
.rbegin()->second
.pop_front();
279 if (high_queue
.rbegin()->second
.empty()) {
280 high_queue
.erase(high_queue
.rbegin()->first
);
285 // if there are multiple buckets/subqueues with sufficient tokens,
286 // we behave like a strict priority queue among all subqueues that
287 // are eligible to run.
288 for (typename
SubQueues::iterator i
= queue
.begin();
291 assert(!(i
->second
.empty()));
292 if (i
->second
.front().first
< i
->second
.num_tokens()) {
293 T ret
= i
->second
.front().second
;
294 unsigned cost
= i
->second
.front().first
;
295 i
->second
.take_tokens(cost
);
296 i
->second
.pop_front();
297 if (i
->second
.empty()) {
298 remove_queue(i
->first
);
300 distribute_tokens(cost
);
305 // if no subqueues have sufficient tokens, we behave like a strict
307 T ret
= queue
.rbegin()->second
.front().second
;
308 unsigned cost
= queue
.rbegin()->second
.front().first
;
309 queue
.rbegin()->second
.pop_front();
310 if (queue
.rbegin()->second
.empty()) {
311 remove_queue(queue
.rbegin()->first
);
313 distribute_tokens(cost
);
317 void dump(ceph::Formatter
*f
) const final
{
318 f
->dump_int("total_priority", total_priority
);
319 f
->dump_int("max_tokens_per_subqueue", max_tokens_per_subqueue
);
320 f
->dump_int("min_cost", min_cost
);
321 f
->open_array_section("high_queues");
322 for (typename
SubQueues::const_iterator p
= high_queue
.begin();
323 p
!= high_queue
.end();
325 f
->open_object_section("subqueue");
326 f
->dump_int("priority", p
->first
);
331 f
->open_array_section("queues");
332 for (typename
SubQueues::const_iterator p
= queue
.begin();
335 f
->open_object_section("subqueue");
336 f
->dump_int("priority", p
->first
);