]>
git.proxmox.com Git - ceph.git/blob - ceph/src/common/mClockPriorityQueue.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 Red Hat Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
23 #include "common/Formatter.h"
24 #include "common/OpQueue.h"
26 #include "dmclock/src/dmclock_server.h"
28 // the following is done to unclobber _ASSERT_H so it returns to the
30 #include "include/assert.h"
35 namespace dmc
= crimson::dmclock
;
37 template <typename T
, typename K
>
38 class mClockQueue
: public OpQueue
<T
, K
> {
40 using priority_t
= unsigned;
41 using cost_t
= unsigned;
43 typedef std::list
<std::pair
<cost_t
, T
> > ListPairs
;
45 static unsigned filter_list_pairs(ListPairs
*l
,
46 std::function
<bool (const T
&)> f
,
47 std::list
<T
>* out
= nullptr) {
49 for (typename
ListPairs::iterator i
= l
->end();
55 if (f(next
->second
)) {
57 if (out
) out
->push_back(next
->second
);
68 typedef std::map
<K
, ListPairs
> Classes
;
69 // client-class to ordered queue
72 unsigned tokens
, max_tokens
;
75 typename
Classes::iterator cur
;
79 SubQueue(const SubQueue
&other
)
82 max_tokens(other
.max_tokens
),
89 size(0), cur(q
.begin()) {}
91 void set_max_tokens(unsigned mt
) {
95 unsigned get_max_tokens() const {
99 unsigned num_tokens() const {
103 void put_tokens(unsigned t
) {
105 if (tokens
> max_tokens
) {
110 void take_tokens(unsigned t
) {
118 void enqueue(K cl
, cost_t cost
, T item
) {
119 q
[cl
].push_back(std::make_pair(cost
, item
));
125 void enqueue_front(K cl
, cost_t cost
, T item
) {
126 q
[cl
].push_front(std::make_pair(cost
, item
));
132 std::pair
<cost_t
, T
> front() const {
133 assert(!(q
.empty()));
134 assert(cur
!= q
.end());
135 return cur
->second
.front();
139 assert(!(q
.empty()));
140 assert(cur
!= q
.end());
141 cur
->second
.pop_front();
142 if (cur
->second
.empty()) {
149 if (cur
== q
.end()) {
155 unsigned length() const {
157 return (unsigned)size
;
164 void remove_by_filter(std::function
<bool (const T
&)> f
) {
165 for (typename
Classes::iterator i
= q
.begin();
168 size
-= filter_list_pairs(&(i
->second
), f
);
169 if (i
->second
.empty()) {
178 if (cur
== q
.end()) cur
= q
.begin();
181 void remove_by_class(K k
, std::list
<T
> *out
) {
182 typename
Classes::iterator i
= q
.find(k
);
186 size
-= i
->second
.size();
191 for (auto j
= i
->second
.rbegin(); j
!= i
->second
.rend(); ++j
) {
192 out
->push_front(j
->second
);
196 if (cur
== q
.end()) cur
= q
.begin();
199 void dump(ceph::Formatter
*f
) const {
200 f
->dump_int("size", size
);
201 f
->dump_int("num_keys", q
.size());
205 using SubQueues
= std::map
<priority_t
, SubQueue
>;
207 SubQueues high_queue
;
209 dmc::PullPriorityQueue
<K
,T
> queue
;
211 // when enqueue_front is called, rather than try to re-calc tags
212 // to put in mClock priority queue, we'll just keep a separate
213 // list from which we dequeue items first, and only when it's
214 // empty do we use queue.
215 std::list
<std::pair
<K
,T
>> queue_front
;
220 const typename
dmc::PullPriorityQueue
<K
,T
>::ClientInfoFunc
& info_func
) :
221 queue(info_func
, true)
226 unsigned length() const override final
{
228 total
+= queue_front
.size();
229 total
+= queue
.request_count();
230 for (auto i
= high_queue
.cbegin(); i
!= high_queue
.cend(); ++i
) {
231 assert(i
->second
.length());
232 total
+= i
->second
.length();
237 // be sure to do things in reverse priority order and push_front
238 // to the list so items end up on list in front-to-back priority
240 void remove_by_filter(std::function
<bool (const T
&)> filter_accum
) {
241 queue
.remove_by_req_filter(filter_accum
, true);
243 for (auto i
= queue_front
.rbegin(); i
!= queue_front
.rend(); /* no-inc */) {
244 if (filter_accum(i
->second
)) {
245 i
= decltype(i
){ queue_front
.erase(std::next(i
).base()) };
251 for (typename
SubQueues::iterator i
= high_queue
.begin();
252 i
!= high_queue
.end();
254 i
->second
.remove_by_filter(filter_accum
);
255 if (i
->second
.empty()) {
256 i
= high_queue
.erase(i
);
263 void remove_by_class(K k
, std::list
<T
> *out
= nullptr) override final
{
265 queue
.remove_by_client(k
,
267 [&out
] (const T
& t
) { out
->push_front(t
); });
269 queue
.remove_by_client(k
, true);
272 for (auto i
= queue_front
.rbegin(); i
!= queue_front
.rend(); /* no-inc */) {
274 if (nullptr != out
) out
->push_front(i
->second
);
275 i
= decltype(i
){ queue_front
.erase(std::next(i
).base()) };
281 for (auto i
= high_queue
.begin(); i
!= high_queue
.end(); /* no-inc */) {
282 i
->second
.remove_by_class(k
, out
);
283 if (i
->second
.empty()) {
284 i
= high_queue
.erase(i
);
291 void enqueue_strict(K cl
, unsigned priority
, T item
) override final
{
292 high_queue
[priority
].enqueue(cl
, 0, item
);
295 void enqueue_strict_front(K cl
, unsigned priority
, T item
) override final
{
296 high_queue
[priority
].enqueue_front(cl
, 0, item
);
299 void enqueue(K cl
, unsigned priority
, unsigned cost
, T item
) override final
{
300 // priority is ignored
301 queue
.add_request(item
, cl
, cost
);
304 void enqueue_front(K cl
,
307 T item
) override final
{
308 queue_front
.emplace_front(std::pair
<K
,T
>(cl
, item
));
311 bool empty() const override final
{
312 return queue
.empty() && high_queue
.empty() && queue_front
.empty();
315 T
dequeue() override final
{
318 if (!(high_queue
.empty())) {
319 T ret
= high_queue
.rbegin()->second
.front().second
;
320 high_queue
.rbegin()->second
.pop_front();
321 if (high_queue
.rbegin()->second
.empty()) {
322 high_queue
.erase(high_queue
.rbegin()->first
);
327 if (!queue_front
.empty()) {
328 T ret
= queue_front
.front().second
;
329 queue_front
.pop_front();
333 auto pr
= queue
.pull_request();
334 assert(pr
.is_retn());
335 auto& retn
= pr
.get_retn();
336 return *(retn
.request
);
339 void dump(ceph::Formatter
*f
) const override final
{
340 f
->open_array_section("high_queues");
341 for (typename
SubQueues::const_iterator p
= high_queue
.begin();
342 p
!= high_queue
.end();
344 f
->open_object_section("subqueue");
345 f
->dump_int("priority", p
->first
);
351 f
->open_object_section("queue_front");
352 f
->dump_int("size", queue_front
.size());
355 f
->open_object_section("queue");
356 f
->dump_int("size", queue
.request_count());