]>
Commit | Line | Data |
---|---|---|
224ce89b WB |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 Red Hat Inc. | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #pragma once | |
16 | ||
17 | ||
18 | #include <functional> | |
19 | #include <map> | |
20 | #include <list> | |
21 | #include <cmath> | |
22 | ||
23 | #include "common/Formatter.h" | |
24 | #include "common/OpQueue.h" | |
25 | ||
26 | #include "dmclock/src/dmclock_server.h" | |
27 | ||
28 | // the following is done to unclobber _ASSERT_H so it returns to the | |
29 | // way ceph likes it | |
11fdf7f2 | 30 | #include "include/ceph_assert.h" |
224ce89b WB |
31 | |
32 | ||
33 | namespace ceph { | |
34 | ||
35 | namespace dmc = crimson::dmclock; | |
36 | ||
37 | template <typename T, typename K> | |
38 | class mClockQueue : public OpQueue <T, K> { | |
39 | ||
40 | using priority_t = unsigned; | |
41 | using cost_t = unsigned; | |
42 | ||
43 | typedef std::list<std::pair<cost_t, T> > ListPairs; | |
44 | ||
11fdf7f2 TL |
45 | static void filter_list_pairs(ListPairs *l, |
46 | std::function<bool (T&&)> f) { | |
224ce89b WB |
47 | for (typename ListPairs::iterator i = l->end(); |
48 | i != l->begin(); | |
49 | /* no inc */ | |
50 | ) { | |
51 | auto next = i; | |
52 | --next; | |
11fdf7f2 | 53 | if (f(std::move(next->second))) { |
224ce89b WB |
54 | l->erase(next); |
55 | } else { | |
56 | i = next; | |
57 | } | |
58 | } | |
224ce89b WB |
59 | } |
60 | ||
61 | struct SubQueue { | |
62 | private: | |
63 | typedef std::map<K, ListPairs> Classes; | |
64 | // client-class to ordered queue | |
65 | Classes q; | |
66 | ||
67 | unsigned tokens, max_tokens; | |
224ce89b WB |
68 | |
69 | typename Classes::iterator cur; | |
70 | ||
71 | public: | |
72 | ||
73 | SubQueue(const SubQueue &other) | |
74 | : q(other.q), | |
75 | tokens(other.tokens), | |
76 | max_tokens(other.max_tokens), | |
224ce89b WB |
77 | cur(q.begin()) {} |
78 | ||
79 | SubQueue() | |
80 | : tokens(0), | |
81 | max_tokens(0), | |
11fdf7f2 | 82 | cur(q.begin()) {} |
224ce89b WB |
83 | |
84 | void set_max_tokens(unsigned mt) { | |
85 | max_tokens = mt; | |
86 | } | |
87 | ||
88 | unsigned get_max_tokens() const { | |
89 | return max_tokens; | |
90 | } | |
91 | ||
92 | unsigned num_tokens() const { | |
93 | return tokens; | |
94 | } | |
95 | ||
96 | void put_tokens(unsigned t) { | |
97 | tokens += t; | |
98 | if (tokens > max_tokens) { | |
99 | tokens = max_tokens; | |
100 | } | |
101 | } | |
102 | ||
103 | void take_tokens(unsigned t) { | |
104 | if (tokens > t) { | |
105 | tokens -= t; | |
106 | } else { | |
107 | tokens = 0; | |
108 | } | |
109 | } | |
110 | ||
11fdf7f2 TL |
111 | void enqueue(K cl, cost_t cost, T&& item) { |
112 | q[cl].emplace_back(cost, std::move(item)); | |
224ce89b WB |
113 | if (cur == q.end()) |
114 | cur = q.begin(); | |
224ce89b WB |
115 | } |
116 | ||
11fdf7f2 TL |
117 | void enqueue_front(K cl, cost_t cost, T&& item) { |
118 | q[cl].emplace_front(cost, std::move(item)); | |
224ce89b WB |
119 | if (cur == q.end()) |
120 | cur = q.begin(); | |
224ce89b WB |
121 | } |
122 | ||
11fdf7f2 TL |
123 | const std::pair<cost_t, T>& front() const { |
124 | ceph_assert(!(q.empty())); | |
125 | ceph_assert(cur != q.end()); | |
126 | return cur->second.front(); | |
127 | } | |
128 | ||
129 | std::pair<cost_t, T>& front() { | |
130 | ceph_assert(!(q.empty())); | |
131 | ceph_assert(cur != q.end()); | |
224ce89b WB |
132 | return cur->second.front(); |
133 | } | |
134 | ||
135 | void pop_front() { | |
11fdf7f2 TL |
136 | ceph_assert(!(q.empty())); |
137 | ceph_assert(cur != q.end()); | |
224ce89b WB |
138 | cur->second.pop_front(); |
139 | if (cur->second.empty()) { | |
140 | auto i = cur; | |
141 | ++cur; | |
142 | q.erase(i); | |
143 | } else { | |
144 | ++cur; | |
145 | } | |
146 | if (cur == q.end()) { | |
147 | cur = q.begin(); | |
148 | } | |
224ce89b WB |
149 | } |
150 | ||
11fdf7f2 TL |
151 | unsigned get_size_slow() const { |
152 | unsigned count = 0; | |
153 | for (const auto& cls : q) { | |
154 | count += cls.second.size(); | |
155 | } | |
156 | return count; | |
224ce89b WB |
157 | } |
158 | ||
159 | bool empty() const { | |
160 | return q.empty(); | |
161 | } | |
162 | ||
11fdf7f2 | 163 | void remove_by_filter(std::function<bool (T&&)> f) { |
224ce89b WB |
164 | for (typename Classes::iterator i = q.begin(); |
165 | i != q.end(); | |
166 | /* no-inc */) { | |
11fdf7f2 | 167 | filter_list_pairs(&(i->second), f); |
224ce89b WB |
168 | if (i->second.empty()) { |
169 | if (cur == i) { | |
170 | ++cur; | |
171 | } | |
172 | i = q.erase(i); | |
173 | } else { | |
174 | ++i; | |
175 | } | |
176 | } | |
177 | if (cur == q.end()) cur = q.begin(); | |
178 | } | |
179 | ||
180 | void remove_by_class(K k, std::list<T> *out) { | |
181 | typename Classes::iterator i = q.find(k); | |
182 | if (i == q.end()) { | |
183 | return; | |
184 | } | |
224ce89b WB |
185 | if (i == cur) { |
186 | ++cur; | |
187 | } | |
188 | if (out) { | |
189 | for (auto j = i->second.rbegin(); j != i->second.rend(); ++j) { | |
11fdf7f2 | 190 | out->push_front(std::move(j->second)); |
224ce89b WB |
191 | } |
192 | } | |
193 | q.erase(i); | |
194 | if (cur == q.end()) cur = q.begin(); | |
195 | } | |
196 | ||
197 | void dump(ceph::Formatter *f) const { | |
11fdf7f2 | 198 | f->dump_int("size", get_size_slow()); |
224ce89b WB |
199 | f->dump_int("num_keys", q.size()); |
200 | } | |
201 | }; | |
202 | ||
203 | using SubQueues = std::map<priority_t, SubQueue>; | |
204 | ||
205 | SubQueues high_queue; | |
206 | ||
11fdf7f2 TL |
207 | using Queue = dmc::PullPriorityQueue<K,T,false>; |
208 | Queue queue; | |
224ce89b WB |
209 | |
210 | // when enqueue_front is called, rather than try to re-calc tags | |
211 | // to put in mClock priority queue, we'll just keep a separate | |
212 | // list from which we dequeue items first, and only when it's | |
213 | // empty do we use queue. | |
214 | std::list<std::pair<K,T>> queue_front; | |
215 | ||
216 | public: | |
217 | ||
218 | mClockQueue( | |
11fdf7f2 TL |
219 | const typename Queue::ClientInfoFunc& info_func, |
220 | double anticipation_timeout = 0.0) : | |
221 | queue(info_func, dmc::AtLimit::Allow, anticipation_timeout) | |
224ce89b WB |
222 | { |
223 | // empty | |
224 | } | |
225 | ||
11fdf7f2 | 226 | unsigned get_size_slow() const { |
224ce89b WB |
227 | unsigned total = 0; |
228 | total += queue_front.size(); | |
229 | total += queue.request_count(); | |
230 | for (auto i = high_queue.cbegin(); i != high_queue.cend(); ++i) { | |
11fdf7f2 TL |
231 | ceph_assert(i->second.get_size_slow()); |
232 | total += i->second.get_size_slow(); | |
224ce89b WB |
233 | } |
234 | return total; | |
235 | } | |
236 | ||
237 | // be sure to do things in reverse priority order and push_front | |
238 | // to the list so items end up on list in front-to-back priority | |
239 | // order | |
11fdf7f2 TL |
240 | void remove_by_filter(std::function<bool (T&&)> filter_accum) { |
241 | queue.remove_by_req_filter([&] (std::unique_ptr<T>&& r) { | |
242 | return filter_accum(std::move(*r)); | |
243 | }, true); | |
224ce89b WB |
244 | |
245 | for (auto i = queue_front.rbegin(); i != queue_front.rend(); /* no-inc */) { | |
11fdf7f2 | 246 | if (filter_accum(std::move(i->second))) { |
224ce89b WB |
247 | i = decltype(i){ queue_front.erase(std::next(i).base()) }; |
248 | } else { | |
249 | ++i; | |
250 | } | |
251 | } | |
252 | ||
253 | for (typename SubQueues::iterator i = high_queue.begin(); | |
254 | i != high_queue.end(); | |
255 | /* no-inc */ ) { | |
256 | i->second.remove_by_filter(filter_accum); | |
257 | if (i->second.empty()) { | |
258 | i = high_queue.erase(i); | |
259 | } else { | |
260 | ++i; | |
261 | } | |
262 | } | |
263 | } | |
264 | ||
265 | void remove_by_class(K k, std::list<T> *out = nullptr) override final { | |
266 | if (out) { | |
267 | queue.remove_by_client(k, | |
268 | true, | |
11fdf7f2 TL |
269 | [&out] (std::unique_ptr<T>&& t) { |
270 | out->push_front(std::move(*t)); | |
271 | }); | |
224ce89b WB |
272 | } else { |
273 | queue.remove_by_client(k, true); | |
274 | } | |
275 | ||
276 | for (auto i = queue_front.rbegin(); i != queue_front.rend(); /* no-inc */) { | |
277 | if (k == i->first) { | |
11fdf7f2 | 278 | if (nullptr != out) out->push_front(std::move(i->second)); |
224ce89b WB |
279 | i = decltype(i){ queue_front.erase(std::next(i).base()) }; |
280 | } else { | |
281 | ++i; | |
282 | } | |
283 | } | |
284 | ||
285 | for (auto i = high_queue.begin(); i != high_queue.end(); /* no-inc */) { | |
286 | i->second.remove_by_class(k, out); | |
287 | if (i->second.empty()) { | |
288 | i = high_queue.erase(i); | |
289 | } else { | |
290 | ++i; | |
291 | } | |
292 | } | |
293 | } | |
294 | ||
11fdf7f2 TL |
295 | void enqueue_strict(K cl, unsigned priority, T&& item) override final { |
296 | high_queue[priority].enqueue(cl, 1, std::move(item)); | |
224ce89b WB |
297 | } |
298 | ||
11fdf7f2 TL |
299 | void enqueue_strict_front(K cl, unsigned priority, T&& item) override final { |
300 | high_queue[priority].enqueue_front(cl, 1, std::move(item)); | |
224ce89b WB |
301 | } |
302 | ||
11fdf7f2 | 303 | void enqueue(K cl, unsigned priority, unsigned cost, T&& item) override final { |
224ce89b | 304 | // priority is ignored |
d2e6a577 | 305 | queue.add_request(std::move(item), cl, cost); |
224ce89b WB |
306 | } |
307 | ||
308 | void enqueue_front(K cl, | |
309 | unsigned priority, | |
310 | unsigned cost, | |
11fdf7f2 TL |
311 | T&& item) override final { |
312 | queue_front.emplace_front(std::pair<K,T>(cl, std::move(item))); | |
224ce89b WB |
313 | } |
314 | ||
315 | bool empty() const override final { | |
316 | return queue.empty() && high_queue.empty() && queue_front.empty(); | |
317 | } | |
318 | ||
319 | T dequeue() override final { | |
11fdf7f2 | 320 | ceph_assert(!empty()); |
224ce89b | 321 | |
11fdf7f2 TL |
322 | if (!high_queue.empty()) { |
323 | T ret = std::move(high_queue.rbegin()->second.front().second); | |
224ce89b WB |
324 | high_queue.rbegin()->second.pop_front(); |
325 | if (high_queue.rbegin()->second.empty()) { | |
326 | high_queue.erase(high_queue.rbegin()->first); | |
327 | } | |
328 | return ret; | |
329 | } | |
330 | ||
331 | if (!queue_front.empty()) { | |
11fdf7f2 | 332 | T ret = std::move(queue_front.front().second); |
224ce89b WB |
333 | queue_front.pop_front(); |
334 | return ret; | |
335 | } | |
336 | ||
337 | auto pr = queue.pull_request(); | |
11fdf7f2 | 338 | ceph_assert(pr.is_retn()); |
224ce89b | 339 | auto& retn = pr.get_retn(); |
11fdf7f2 | 340 | return std::move(*(retn.request)); |
224ce89b WB |
341 | } |
342 | ||
343 | void dump(ceph::Formatter *f) const override final { | |
344 | f->open_array_section("high_queues"); | |
345 | for (typename SubQueues::const_iterator p = high_queue.begin(); | |
346 | p != high_queue.end(); | |
347 | ++p) { | |
348 | f->open_object_section("subqueue"); | |
349 | f->dump_int("priority", p->first); | |
350 | p->second.dump(f); | |
351 | f->close_section(); | |
352 | } | |
353 | f->close_section(); | |
354 | ||
355 | f->open_object_section("queue_front"); | |
356 | f->dump_int("size", queue_front.size()); | |
357 | f->close_section(); | |
358 | ||
359 | f->open_object_section("queue"); | |
360 | f->dump_int("size", queue.request_count()); | |
361 | f->close_section(); | |
362 | } // dump | |
9f95a23c TL |
363 | |
364 | void print(std::ostream &os) const final { | |
365 | os << "mClockPriorityQueue"; | |
366 | } | |
224ce89b WB |
367 | }; |
368 | ||
369 | } // namespace ceph |