]>
git.proxmox.com Git - ceph.git/blob - ceph/src/common/mClockPriorityQueue.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 Red Hat Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
23 #include "common/Formatter.h"
24 #include "common/OpQueue.h"
26 #include "dmclock/src/dmclock_server.h"
28 // the following is done to unclobber _ASSERT_H so it returns to the
30 #include "include/ceph_assert.h"
35 namespace dmc
= crimson::dmclock
;
37 template <typename T
, typename K
>
38 class mClockQueue
: public OpQueue
<T
, K
> {
40 using priority_t
= unsigned;
41 using cost_t
= unsigned;
43 typedef std::list
<std::pair
<cost_t
, T
> > ListPairs
;
45 static void filter_list_pairs(ListPairs
*l
,
46 std::function
<bool (T
&&)> f
) {
47 for (typename
ListPairs::iterator i
= l
->end();
53 if (f(std::move(next
->second
))) {
63 typedef std::map
<K
, ListPairs
> Classes
;
64 // client-class to ordered queue
67 unsigned tokens
, max_tokens
;
69 typename
Classes::iterator cur
;
73 SubQueue(const SubQueue
&other
)
76 max_tokens(other
.max_tokens
),
84 void set_max_tokens(unsigned mt
) {
88 unsigned get_max_tokens() const {
92 unsigned num_tokens() const {
96 void put_tokens(unsigned t
) {
98 if (tokens
> max_tokens
) {
103 void take_tokens(unsigned t
) {
111 void enqueue(K cl
, cost_t cost
, T
&& item
) {
112 q
[cl
].emplace_back(cost
, std::move(item
));
117 void enqueue_front(K cl
, cost_t cost
, T
&& item
) {
118 q
[cl
].emplace_front(cost
, std::move(item
));
123 const std::pair
<cost_t
, T
>& front() const {
124 ceph_assert(!(q
.empty()));
125 ceph_assert(cur
!= q
.end());
126 return cur
->second
.front();
129 std::pair
<cost_t
, T
>& front() {
130 ceph_assert(!(q
.empty()));
131 ceph_assert(cur
!= q
.end());
132 return cur
->second
.front();
136 ceph_assert(!(q
.empty()));
137 ceph_assert(cur
!= q
.end());
138 cur
->second
.pop_front();
139 if (cur
->second
.empty()) {
146 if (cur
== q
.end()) {
151 unsigned get_size_slow() const {
153 for (const auto& cls
: q
) {
154 count
+= cls
.second
.size();
163 void remove_by_filter(std::function
<bool (T
&&)> f
) {
164 for (typename
Classes::iterator i
= q
.begin();
167 filter_list_pairs(&(i
->second
), f
);
168 if (i
->second
.empty()) {
177 if (cur
== q
.end()) cur
= q
.begin();
180 void remove_by_class(K k
, std::list
<T
> *out
) {
181 typename
Classes::iterator i
= q
.find(k
);
189 for (auto j
= i
->second
.rbegin(); j
!= i
->second
.rend(); ++j
) {
190 out
->push_front(std::move(j
->second
));
194 if (cur
== q
.end()) cur
= q
.begin();
197 void dump(ceph::Formatter
*f
) const {
198 f
->dump_int("size", get_size_slow());
199 f
->dump_int("num_keys", q
.size());
203 using SubQueues
= std::map
<priority_t
, SubQueue
>;
205 SubQueues high_queue
;
207 using Queue
= dmc::PullPriorityQueue
<K
,T
,false>;
210 // when enqueue_front is called, rather than try to re-calc tags
211 // to put in mClock priority queue, we'll just keep a separate
212 // list from which we dequeue items first, and only when it's
213 // empty do we use queue.
214 std::list
<std::pair
<K
,T
>> queue_front
;
219 const typename
Queue::ClientInfoFunc
& info_func
,
220 double anticipation_timeout
= 0.0) :
221 queue(info_func
, dmc::AtLimit::Allow
, anticipation_timeout
)
226 unsigned get_size_slow() const {
228 total
+= queue_front
.size();
229 total
+= queue
.request_count();
230 for (auto i
= high_queue
.cbegin(); i
!= high_queue
.cend(); ++i
) {
231 ceph_assert(i
->second
.get_size_slow());
232 total
+= i
->second
.get_size_slow();
237 // be sure to do things in reverse priority order and push_front
238 // to the list so items end up on list in front-to-back priority
240 void remove_by_filter(std::function
<bool (T
&&)> filter_accum
) {
241 queue
.remove_by_req_filter([&] (std::unique_ptr
<T
>&& r
) {
242 return filter_accum(std::move(*r
));
245 for (auto i
= queue_front
.rbegin(); i
!= queue_front
.rend(); /* no-inc */) {
246 if (filter_accum(std::move(i
->second
))) {
247 i
= decltype(i
){ queue_front
.erase(std::next(i
).base()) };
253 for (typename
SubQueues::iterator i
= high_queue
.begin();
254 i
!= high_queue
.end();
256 i
->second
.remove_by_filter(filter_accum
);
257 if (i
->second
.empty()) {
258 i
= high_queue
.erase(i
);
265 void remove_by_class(K k
, std::list
<T
> *out
= nullptr) override final
{
267 queue
.remove_by_client(k
,
269 [&out
] (std::unique_ptr
<T
>&& t
) {
270 out
->push_front(std::move(*t
));
273 queue
.remove_by_client(k
, true);
276 for (auto i
= queue_front
.rbegin(); i
!= queue_front
.rend(); /* no-inc */) {
278 if (nullptr != out
) out
->push_front(std::move(i
->second
));
279 i
= decltype(i
){ queue_front
.erase(std::next(i
).base()) };
285 for (auto i
= high_queue
.begin(); i
!= high_queue
.end(); /* no-inc */) {
286 i
->second
.remove_by_class(k
, out
);
287 if (i
->second
.empty()) {
288 i
= high_queue
.erase(i
);
295 void enqueue_strict(K cl
, unsigned priority
, T
&& item
) override final
{
296 high_queue
[priority
].enqueue(cl
, 1, std::move(item
));
299 void enqueue_strict_front(K cl
, unsigned priority
, T
&& item
) override final
{
300 high_queue
[priority
].enqueue_front(cl
, 1, std::move(item
));
303 void enqueue(K cl
, unsigned priority
, unsigned cost
, T
&& item
) override final
{
304 // priority is ignored
305 queue
.add_request(std::move(item
), cl
, cost
);
308 void enqueue_front(K cl
,
311 T
&& item
) override final
{
312 queue_front
.emplace_front(std::pair
<K
,T
>(cl
, std::move(item
)));
315 bool empty() const override final
{
316 return queue
.empty() && high_queue
.empty() && queue_front
.empty();
319 T
dequeue() override final
{
320 ceph_assert(!empty());
322 if (!high_queue
.empty()) {
323 T ret
= std::move(high_queue
.rbegin()->second
.front().second
);
324 high_queue
.rbegin()->second
.pop_front();
325 if (high_queue
.rbegin()->second
.empty()) {
326 high_queue
.erase(high_queue
.rbegin()->first
);
331 if (!queue_front
.empty()) {
332 T ret
= std::move(queue_front
.front().second
);
333 queue_front
.pop_front();
337 auto pr
= queue
.pull_request();
338 ceph_assert(pr
.is_retn());
339 auto& retn
= pr
.get_retn();
340 return std::move(*(retn
.request
));
343 void dump(ceph::Formatter
*f
) const override final
{
344 f
->open_array_section("high_queues");
345 for (typename
SubQueues::const_iterator p
= high_queue
.begin();
346 p
!= high_queue
.end();
348 f
->open_object_section("subqueue");
349 f
->dump_int("priority", p
->first
);
355 f
->open_object_section("queue_front");
356 f
->dump_int("size", queue_front
.size());
359 f
->open_object_section("queue");
360 f
->dump_int("size", queue
.request_count());