1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
20 #include <boost/intrusive/list.hpp>
21 #include <boost/intrusive/rbtree.hpp>
22 #include <boost/intrusive/avl_set.hpp>
24 namespace bi
= boost::intrusive
;
26 template <typename T
, typename S
>
30 bool operator()(const S i
, const T
&k
) const
34 bool operator()(const T
&k
, const S i
) const
44 void operator()(T
* delete_this
)
45 { delete delete_this
; }
48 template <typename T
, typename K
>
49 class WeightedPriorityQueue
: public OpQueue
<T
, K
>
52 class ListPair
: public bi::list_base_hook
<>
57 ListPair(unsigned c
, T
& i
) :
62 class Klass
: public bi::set_base_hook
<>
64 typedef bi::list
<ListPair
> ListPairs
;
65 typedef typename
ListPairs::iterator Lit
;
72 friend bool operator< (const Klass
&a
, const Klass
&b
)
73 { return a
.key
< b
.key
; }
74 friend bool operator> (const Klass
&a
, const Klass
&b
)
75 { return a
.key
> b
.key
; }
76 friend bool operator== (const Klass
&a
, const Klass
&b
)
77 { return a
.key
== b
.key
; }
78 void insert(unsigned cost
, T
& item
, bool front
) {
80 lp
.push_front(*new ListPair(cost
, item
));
82 lp
.push_back(*new ListPair(cost
, item
));
85 //Get the cost of the next item to dequeue
86 unsigned get_cost() const {
88 return lp
.begin()->cost
;
92 T ret
= lp
.begin()->item
;
93 lp
.erase_and_dispose(lp
.begin(), DelItem
<ListPair
>());
99 unsigned get_size() const {
102 unsigned filter_list_pairs(std::function
<bool (T
)>& f
) {
104 // intrusive containers can't erase with a reverse_iterator
105 // so we have to walk backwards on our own. Since there is
106 // no iterator before begin, we have to test at the end.
107 for (Lit i
= --lp
.end();; --i
) {
109 i
= lp
.erase_and_dispose(i
, DelItem
<ListPair
>());
112 if (i
== lp
.begin()) {
118 unsigned filter_class(std::list
<T
>* out
) {
120 for (Lit i
= --lp
.end();; --i
) {
122 out
->push_front(i
->item
);
124 i
= lp
.erase_and_dispose(i
, DelItem
<ListPair
>());
126 if (i
== lp
.begin()) {
133 class SubQueue
: public bi::set_base_hook
<>
135 typedef bi::rbtree
<Klass
> Klasses
;
136 typedef typename
Klasses::iterator Kit
;
138 if (next
== klasses
.end()) {
139 next
= klasses
.begin();
143 unsigned key
; // priority
146 SubQueue(unsigned& p
) :
148 next(klasses
.begin())
150 friend bool operator< (const SubQueue
&a
, const SubQueue
&b
)
151 { return a
.key
< b
.key
; }
152 friend bool operator> (const SubQueue
&a
, const SubQueue
&b
)
153 { return a
.key
> b
.key
; }
154 friend bool operator== (const SubQueue
&a
, const SubQueue
&b
)
155 { return a
.key
== b
.key
; }
157 return klasses
.empty();
159 void insert(K cl
, unsigned cost
, T
& item
, bool front
= false) {
160 typename
Klasses::insert_commit_data insert_data
;
161 std::pair
<Kit
, bool> ret
=
162 klasses
.insert_unique_check(cl
, MapKey
<Klass
, K
>(), insert_data
);
164 ret
.first
= klasses
.insert_unique_commit(*new Klass(cl
), insert_data
);
167 ret
.first
->insert(cost
, item
, front
);
169 unsigned get_cost() const {
171 return next
->get_cost();
176 next
= klasses
.erase_and_dispose(next
, DelItem
<Klass
>());
183 unsigned filter_list_pairs(std::function
<bool (T
)>& f
) {
185 // intrusive containers can't erase with a reverse_iterator
186 // so we have to walk backwards on our own. Since there is
187 // no iterator before begin, we have to test at the end.
188 for (Kit i
= klasses
.begin(); i
!= klasses
.end();) {
189 count
+= i
->filter_list_pairs(f
);
194 i
= klasses
.erase_and_dispose(i
, DelItem
<Klass
>());
202 unsigned filter_class(K
& cl
, std::list
<T
>* out
) {
204 Kit i
= klasses
.find(cl
, MapKey
<Klass
, K
>());
205 if (i
!= klasses
.end()) {
206 count
= i
->filter_class(out
);
207 Kit tmp
= klasses
.erase_and_dispose(i
, DelItem
<Klass
>());
215 void dump(ceph::Formatter
*f
) const {
216 f
->dump_int("num_keys", next
->get_size());
218 f
->dump_int("first_item_cost", next
->get_cost());
223 typedef bi::rbtree
<SubQueue
> SubQueues
;
224 typedef typename
SubQueues::iterator Sit
;
238 void insert(unsigned p
, K cl
, unsigned cost
, T
& item
, bool front
= false) {
239 typename
SubQueues::insert_commit_data insert_data
;
240 std::pair
<typename
SubQueues::iterator
, bool> ret
=
241 queues
.insert_unique_check(p
, MapKey
<SubQueue
, unsigned>(), insert_data
);
243 ret
.first
= queues
.insert_unique_commit(*new SubQueue(p
), insert_data
);
246 ret
.first
->insert(cl
, cost
, item
, front
);
247 if (cost
> max_cost
) {
252 T
pop(bool strict
= false) {
254 Sit i
= --queues
.end();
258 queues
.erase_and_dispose(i
, DelItem
<SubQueue
>());
262 if (queues
.size() > 1) {
264 // Pick a new priority out of the total priority.
265 unsigned prio
= rand() % total_prio
+ 1;
266 unsigned tp
= total_prio
- i
->key
;
267 // Find the priority coresponding to the picked number.
268 // Subtract high priorities to low priorities until the picked number
269 // is more than the total and try to dequeue that priority.
270 // Reverse the direction from previous implementation because there is a higher
271 // chance of dequeuing a high priority op so spend less time spinning.
276 // Flip a coin to see if this priority gets to run based on cost.
277 // The next op's cost is multiplied by .9 and subtracted from the
278 // max cost seen. Ops with lower costs will have a larger value
279 // and allow them to be selected easier than ops with high costs.
280 if (max_cost
== 0 || rand() % max_cost
<=
281 (max_cost
- ((i
->get_cost() * 9) / 10))) {
289 total_prio
-= i
->key
;
290 queues
.erase_and_dispose(i
, DelItem
<SubQueue
>());
294 void filter_list_pairs(std::function
<bool (T
)>& f
) {
295 for (Sit i
= queues
.begin(); i
!= queues
.end();) {
296 size
-= i
->filter_list_pairs(f
);
298 total_prio
-= i
->key
;
299 i
= queues
.erase_and_dispose(i
, DelItem
<SubQueue
>());
305 void filter_class(K
& cl
, std::list
<T
>* out
) {
306 for (Sit i
= queues
.begin(); i
!= queues
.end();) {
307 size
-= i
->filter_class(cl
, out
);
309 total_prio
-= i
->key
;
310 i
= queues
.erase_and_dispose(i
, DelItem
<SubQueue
>());
316 void dump(ceph::Formatter
*f
) const {
317 for (typename
SubQueues::const_iterator i
= queues
.begin();
318 i
!= queues
.end(); ++i
) {
319 f
->dump_int("total_priority", total_prio
);
320 f
->dump_int("max_cost", max_cost
);
321 f
->open_object_section("subqueue");
322 f
->dump_int("priority", i
->key
);
332 WeightedPriorityQueue(unsigned max_per
, unsigned min_c
) :
338 unsigned length() const final
{
339 return strict
.size
+ normal
.size
;
341 void remove_by_filter(std::function
<bool (T
)> f
) final
{
342 strict
.filter_list_pairs(f
);
343 normal
.filter_list_pairs(f
);
345 void remove_by_class(K cl
, std::list
<T
>* removed
= 0) final
{
346 strict
.filter_class(cl
, removed
);
347 normal
.filter_class(cl
, removed
);
349 bool empty() const final
{
350 return !(strict
.size
+ normal
.size
);
352 void enqueue_strict(K cl
, unsigned p
, T item
) final
{
353 strict
.insert(p
, cl
, 0, item
);
355 void enqueue_strict_front(K cl
, unsigned p
, T item
) final
{
356 strict
.insert(p
, cl
, 0, item
, true);
358 void enqueue(K cl
, unsigned p
, unsigned cost
, T item
) final
{
359 normal
.insert(p
, cl
, cost
, item
);
361 void enqueue_front(K cl
, unsigned p
, unsigned cost
, T item
) final
{
362 normal
.insert(p
, cl
, cost
, item
, true);
364 T
dequeue() override
{
365 assert(strict
.size
+ normal
.size
> 0);
366 if (!strict
.empty()) {
367 return strict
.pop(true);
371 void dump(ceph::Formatter
*f
) const override
{
372 f
->open_array_section("high_queues");
375 f
->open_array_section("queues");