1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
20 #include <boost/intrusive/list.hpp>
21 #include <boost/intrusive/rbtree.hpp>
22 #include <boost/intrusive/avl_set.hpp>
24 namespace bi
= boost::intrusive
;
26 template <typename T
, typename S
>
30 bool operator()(const S i
, const T
&k
) const
34 bool operator()(const T
&k
, const S i
) const
44 void operator()(T
* delete_this
)
45 { delete delete_this
; }
48 template <typename T
, typename K
>
49 class WeightedPriorityQueue
: public OpQueue
<T
, K
>
52 class ListPair
: public bi::list_base_hook
<>
57 ListPair(unsigned c
, T
& i
) :
62 class Klass
: public bi::set_base_hook
<>
64 typedef bi::list
<ListPair
> ListPairs
;
65 typedef typename
ListPairs::iterator Lit
;
72 friend bool operator< (const Klass
&a
, const Klass
&b
)
73 { return a
.key
< b
.key
; }
74 friend bool operator> (const Klass
&a
, const Klass
&b
)
75 { return a
.key
> b
.key
; }
76 friend bool operator== (const Klass
&a
, const Klass
&b
)
77 { return a
.key
== b
.key
; }
78 void insert(unsigned cost
, T
& item
, bool front
) {
80 lp
.push_front(*new ListPair(cost
, item
));
82 lp
.push_back(*new ListPair(cost
, item
));
85 //Get the cost of the next item to dequeue
86 unsigned get_cost() const {
88 return lp
.begin()->cost
;
92 T ret
= lp
.begin()->item
;
93 lp
.erase_and_dispose(lp
.begin(), DelItem
<ListPair
>());
99 unsigned get_size() const {
102 unsigned filter_class(std::list
<T
>* out
) {
104 for (Lit i
= --lp
.end();; --i
) {
106 out
->push_front(i
->item
);
108 i
= lp
.erase_and_dispose(i
, DelItem
<ListPair
>());
110 if (i
== lp
.begin()) {
117 class SubQueue
: public bi::set_base_hook
<>
119 typedef bi::rbtree
<Klass
> Klasses
;
120 typedef typename
Klasses::iterator Kit
;
122 if (next
== klasses
.end()) {
123 next
= klasses
.begin();
127 unsigned key
; // priority
130 SubQueue(unsigned& p
) :
132 next(klasses
.begin())
134 friend bool operator< (const SubQueue
&a
, const SubQueue
&b
)
135 { return a
.key
< b
.key
; }
136 friend bool operator> (const SubQueue
&a
, const SubQueue
&b
)
137 { return a
.key
> b
.key
; }
138 friend bool operator== (const SubQueue
&a
, const SubQueue
&b
)
139 { return a
.key
== b
.key
; }
141 return klasses
.empty();
143 void insert(K cl
, unsigned cost
, T
& item
, bool front
= false) {
144 typename
Klasses::insert_commit_data insert_data
;
145 std::pair
<Kit
, bool> ret
=
146 klasses
.insert_unique_check(cl
, MapKey
<Klass
, K
>(), insert_data
);
148 ret
.first
= klasses
.insert_unique_commit(*new Klass(cl
), insert_data
);
151 ret
.first
->insert(cost
, item
, front
);
153 unsigned get_cost() const {
155 return next
->get_cost();
160 next
= klasses
.erase_and_dispose(next
, DelItem
<Klass
>());
167 unsigned filter_class(K
& cl
, std::list
<T
>* out
) {
169 Kit i
= klasses
.find(cl
, MapKey
<Klass
, K
>());
170 if (i
!= klasses
.end()) {
171 count
= i
->filter_class(out
);
172 Kit tmp
= klasses
.erase_and_dispose(i
, DelItem
<Klass
>());
180 void dump(ceph::Formatter
*f
) const {
181 f
->dump_int("num_keys", next
->get_size());
183 f
->dump_int("first_item_cost", next
->get_cost());
188 typedef bi::rbtree
<SubQueue
> SubQueues
;
189 typedef typename
SubQueues::iterator Sit
;
203 void insert(unsigned p
, K cl
, unsigned cost
, T
& item
, bool front
= false) {
204 typename
SubQueues::insert_commit_data insert_data
;
205 std::pair
<typename
SubQueues::iterator
, bool> ret
=
206 queues
.insert_unique_check(p
, MapKey
<SubQueue
, unsigned>(), insert_data
);
208 ret
.first
= queues
.insert_unique_commit(*new SubQueue(p
), insert_data
);
211 ret
.first
->insert(cl
, cost
, item
, front
);
212 if (cost
> max_cost
) {
217 T
pop(bool strict
= false) {
219 Sit i
= --queues
.end();
223 queues
.erase_and_dispose(i
, DelItem
<SubQueue
>());
227 if (queues
.size() > 1) {
229 // Pick a new priority out of the total priority.
230 unsigned prio
= rand() % total_prio
+ 1;
231 unsigned tp
= total_prio
- i
->key
;
232 // Find the priority coresponding to the picked number.
233 // Subtract high priorities to low priorities until the picked number
234 // is more than the total and try to dequeue that priority.
235 // Reverse the direction from previous implementation because there is a higher
236 // chance of dequeuing a high priority op so spend less time spinning.
241 // Flip a coin to see if this priority gets to run based on cost.
242 // The next op's cost is multiplied by .9 and subtracted from the
243 // max cost seen. Ops with lower costs will have a larger value
244 // and allow them to be selected easier than ops with high costs.
245 if (max_cost
== 0 || rand() % max_cost
<=
246 (max_cost
- ((i
->get_cost() * 9) / 10))) {
254 total_prio
-= i
->key
;
255 queues
.erase_and_dispose(i
, DelItem
<SubQueue
>());
259 void filter_class(K
& cl
, std::list
<T
>* out
) {
260 for (Sit i
= queues
.begin(); i
!= queues
.end();) {
261 size
-= i
->filter_class(cl
, out
);
263 total_prio
-= i
->key
;
264 i
= queues
.erase_and_dispose(i
, DelItem
<SubQueue
>());
270 void dump(ceph::Formatter
*f
) const {
271 for (typename
SubQueues::const_iterator i
= queues
.begin();
272 i
!= queues
.end(); ++i
) {
273 f
->dump_int("total_priority", total_prio
);
274 f
->dump_int("max_cost", max_cost
);
275 f
->open_object_section("subqueue");
276 f
->dump_int("priority", i
->key
);
286 WeightedPriorityQueue(unsigned max_per
, unsigned min_c
) :
292 unsigned length() const final
{
293 return strict
.size
+ normal
.size
;
295 void remove_by_class(K cl
, std::list
<T
>* removed
= 0) final
{
296 strict
.filter_class(cl
, removed
);
297 normal
.filter_class(cl
, removed
);
299 bool empty() const final
{
300 return !(strict
.size
+ normal
.size
);
302 void enqueue_strict(K cl
, unsigned p
, T item
) final
{
303 strict
.insert(p
, cl
, 0, item
);
305 void enqueue_strict_front(K cl
, unsigned p
, T item
) final
{
306 strict
.insert(p
, cl
, 0, item
, true);
308 void enqueue(K cl
, unsigned p
, unsigned cost
, T item
) final
{
309 normal
.insert(p
, cl
, cost
, item
);
311 void enqueue_front(K cl
, unsigned p
, unsigned cost
, T item
) final
{
312 normal
.insert(p
, cl
, cost
, item
, true);
314 T
dequeue() override
{
315 assert(strict
.size
+ normal
.size
> 0);
316 if (!strict
.empty()) {
317 return strict
.pop(true);
321 void dump(ceph::Formatter
*f
) const override
{
322 f
->open_array_section("high_queues");
325 f
->open_array_section("queues");