]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | /* | |
5 | * Copyright (C) 2017 Red Hat Inc. | |
11fdf7f2 TL |
6 | * |
7 | * Author: J. Eric Ivancich <ivancich@redhat.com> | |
8 | * | |
9 | * This is free software; you can redistribute it and/or modify it | |
10 | * under the terms of the GNU Lesser General Public License version | |
11 | * 2.1, as published by the Free Software Foundation. See file | |
12 | * COPYING. | |
7c673cae FG |
13 | */ |
14 | ||
15 | ||
16 | #pragma once | |
17 | ||
18 | /* COMPILATION OPTIONS | |
7c673cae FG |
19 | * |
20 | * The prop_heap does not seem to be necessary. The only thing it | |
11fdf7f2 | 21 | * would help with is quickly finding the minimum proportion/prioity |
7c673cae FG |
22 | * when an idle client became active. To have the code maintain the |
23 | * proportional heap, define USE_PROP_HEAP (i.e., compiler argument | |
24 | * -DUSE_PROP_HEAP). | |
25 | */ | |
26 | ||
27 | #include <assert.h> | |
28 | ||
29 | #include <cmath> | |
30 | #include <memory> | |
31 | #include <map> | |
32 | #include <deque> | |
33 | #include <queue> | |
20effc67 | 34 | #ifndef WITH_SEASTAR |
7c673cae FG |
35 | #include <atomic> |
36 | #include <mutex> | |
37 | #include <condition_variable> | |
38 | #include <thread> | |
20effc67 | 39 | #endif |
7c673cae FG |
40 | #include <iostream> |
41 | #include <sstream> | |
42 | #include <limits> | |
43 | ||
44 | #include <boost/variant.hpp> | |
45 | ||
46 | #include "indirect_intrusive_heap.h" | |
9f95a23c | 47 | #include "../support/src/run_every.h" |
7c673cae FG |
48 | #include "dmclock_util.h" |
49 | #include "dmclock_recs.h" | |
50 | ||
51 | #ifdef PROFILE | |
52 | #include "profile.h" | |
53 | #endif | |
54 | ||
7c673cae FG |
55 | |
56 | namespace crimson { | |
57 | ||
58 | namespace dmclock { | |
59 | ||
60 | namespace c = crimson; | |
61 | ||
62 | constexpr double max_tag = std::numeric_limits<double>::is_iec559 ? | |
63 | std::numeric_limits<double>::infinity() : | |
64 | std::numeric_limits<double>::max(); | |
65 | constexpr double min_tag = std::numeric_limits<double>::is_iec559 ? | |
66 | -std::numeric_limits<double>::infinity() : | |
67 | std::numeric_limits<double>::lowest(); | |
9f95a23c | 68 | constexpr unsigned tag_modulo = 1000000; |
7c673cae | 69 | |
11fdf7f2 TL |
70 | constexpr auto standard_idle_age = std::chrono::seconds(300); |
71 | constexpr auto standard_erase_age = std::chrono::seconds(600); | |
72 | constexpr auto standard_check_time = std::chrono::seconds(60); | |
73 | constexpr auto aggressive_check_time = std::chrono::seconds(5); | |
9f95a23c | 74 | constexpr unsigned standard_erase_max = 2000; |
11fdf7f2 TL |
75 | |
76 | enum class AtLimit { | |
77 | // requests are delayed until the limit is restored | |
78 | Wait, | |
79 | // requests are allowed to exceed their limit, if all other reservations | |
80 | // are met and below their limits | |
81 | Allow, | |
82 | // if an incoming request would exceed its limit, add_request() will | |
83 | // reject it with EAGAIN instead of adding it to the queue. cannot be used | |
84 | // with DelayedTagCalc, because add_request() needs an accurate tag | |
85 | Reject, | |
86 | }; | |
87 | ||
88 | // when AtLimit::Reject is used, only start rejecting requests once their | |
89 | // limit is above this threshold. requests under this threshold are | |
90 | // enqueued and processed like AtLimit::Wait | |
91 | using RejectThreshold = Time; | |
92 | ||
93 | // the AtLimit constructor parameter can either accept AtLimit or a value | |
94 | // for RejectThreshold (which implies AtLimit::Reject) | |
95 | using AtLimitParam = boost::variant<AtLimit, RejectThreshold>; | |
96 | ||
7c673cae | 97 | struct ClientInfo { |
11fdf7f2 TL |
98 | double reservation; // minimum |
99 | double weight; // proportional | |
100 | double limit; // maximum | |
7c673cae FG |
101 | |
102 | // multiplicative inverses of above, which we use in calculations | |
103 | // and don't want to recalculate repeatedly | |
11fdf7f2 TL |
104 | double reservation_inv; |
105 | double weight_inv; | |
106 | double limit_inv; | |
7c673cae FG |
107 | |
108 | // order parameters -- min, "normal", max | |
9f95a23c TL |
109 | ClientInfo(double _reservation, double _weight, double _limit) { |
110 | update(_reservation, _weight, _limit); | |
111 | } | |
112 | ||
113 | inline void update(double _reservation, double _weight, double _limit) { | |
114 | reservation = _reservation; | |
115 | weight = _weight; | |
116 | limit = _limit; | |
117 | reservation_inv = (0.0 == reservation) ? 0.0 : 1.0 / reservation; | |
118 | weight_inv = (0.0 == weight) ? 0.0 : 1.0 / weight; | |
119 | limit_inv = (0.0 == limit) ? 0.0 : 1.0 / limit; | |
7c673cae | 120 | } |
7c673cae FG |
121 | |
122 | friend std::ostream& operator<<(std::ostream& out, | |
123 | const ClientInfo& client) { | |
124 | out << | |
125 | "{ ClientInfo:: r:" << client.reservation << | |
126 | " w:" << std::fixed << client.weight << | |
127 | " l:" << std::fixed << client.limit << | |
128 | " 1/r:" << std::fixed << client.reservation_inv << | |
129 | " 1/w:" << std::fixed << client.weight_inv << | |
130 | " 1/l:" << std::fixed << client.limit_inv << | |
131 | " }"; | |
132 | return out; | |
133 | } | |
134 | }; // class ClientInfo | |
135 | ||
136 | ||
137 | struct RequestTag { | |
11fdf7f2 TL |
138 | double reservation; |
139 | double proportion; | |
140 | double limit; | |
141 | uint32_t delta; | |
142 | uint32_t rho; | |
143 | Cost cost; | |
144 | bool ready; // true when within limit | |
145 | Time arrival; | |
7c673cae FG |
146 | |
147 | RequestTag(const RequestTag& prev_tag, | |
148 | const ClientInfo& client, | |
11fdf7f2 TL |
149 | const uint32_t _delta, |
150 | const uint32_t _rho, | |
31f18b77 | 151 | const Time time, |
11fdf7f2 TL |
152 | const Cost _cost = 1u, |
153 | const double anticipation_timeout = 0.0) : | |
154 | delta(_delta), | |
155 | rho(_rho), | |
156 | cost(_cost), | |
157 | ready(false), | |
158 | arrival(time) | |
7c673cae | 159 | { |
11fdf7f2 TL |
160 | assert(cost > 0); |
161 | Time max_time = time; | |
162 | if (time - anticipation_timeout < prev_tag.arrival) | |
163 | max_time -= anticipation_timeout; | |
164 | ||
165 | reservation = tag_calc(max_time, | |
166 | prev_tag.reservation, | |
167 | client.reservation_inv, | |
168 | rho, | |
169 | true, | |
170 | cost); | |
171 | proportion = tag_calc(max_time, | |
172 | prev_tag.proportion, | |
173 | client.weight_inv, | |
174 | delta, | |
175 | true, | |
176 | cost); | |
177 | limit = tag_calc(max_time, | |
178 | prev_tag.limit, | |
179 | client.limit_inv, | |
180 | delta, | |
181 | false, | |
182 | cost); | |
183 | ||
7c673cae FG |
184 | assert(reservation < max_tag || proportion < max_tag); |
185 | } | |
186 | ||
31f18b77 FG |
187 | RequestTag(const RequestTag& prev_tag, |
188 | const ClientInfo& client, | |
189 | const ReqParams req_params, | |
190 | const Time time, | |
11fdf7f2 TL |
191 | const Cost cost = 1u, |
192 | const double anticipation_timeout = 0.0) : | |
193 | RequestTag(prev_tag, client, req_params.delta, req_params.rho, time, | |
194 | cost, anticipation_timeout) | |
31f18b77 FG |
195 | { /* empty */ } |
196 | ||
11fdf7f2 TL |
197 | RequestTag(const double _res, const double _prop, const double _lim, |
198 | const Time _arrival, | |
199 | const uint32_t _delta = 0, | |
200 | const uint32_t _rho = 0, | |
201 | const Cost _cost = 1u) : | |
7c673cae FG |
202 | reservation(_res), |
203 | proportion(_prop), | |
204 | limit(_lim), | |
11fdf7f2 TL |
205 | delta(_delta), |
206 | rho(_rho), | |
207 | cost(_cost), | |
208 | ready(false), | |
209 | arrival(_arrival) | |
7c673cae | 210 | { |
11fdf7f2 | 211 | assert(cost > 0); |
7c673cae FG |
212 | assert(reservation < max_tag || proportion < max_tag); |
213 | } | |
214 | ||
215 | RequestTag(const RequestTag& other) : | |
216 | reservation(other.reservation), | |
217 | proportion(other.proportion), | |
218 | limit(other.limit), | |
11fdf7f2 TL |
219 | delta(other.delta), |
220 | rho(other.rho), | |
221 | cost(other.cost), | |
222 | ready(other.ready), | |
223 | arrival(other.arrival) | |
224 | { /* empty */ } | |
7c673cae FG |
225 | |
226 | static std::string format_tag_change(double before, double after) { | |
227 | if (before == after) { | |
228 | return std::string("same"); | |
229 | } else { | |
230 | std::stringstream ss; | |
231 | ss << format_tag(before) << "=>" << format_tag(after); | |
232 | return ss.str(); | |
233 | } | |
234 | } | |
235 | ||
236 | static std::string format_tag(double value) { | |
237 | if (max_tag == value) { | |
238 | return std::string("max"); | |
239 | } else if (min_tag == value) { | |
240 | return std::string("min"); | |
241 | } else { | |
242 | return format_time(value, tag_modulo); | |
243 | } | |
244 | } | |
245 | ||
246 | private: | |
247 | ||
31f18b77 | 248 | static double tag_calc(const Time time, |
11fdf7f2 TL |
249 | const double prev, |
250 | const double increment, | |
251 | const uint32_t dist_req_val, | |
252 | const bool extreme_is_high, | |
253 | const Cost cost) { | |
7c673cae FG |
254 | if (0.0 == increment) { |
255 | return extreme_is_high ? max_tag : min_tag; | |
256 | } else { | |
11fdf7f2 TL |
257 | // insure 64-bit arithmetic before conversion to double |
258 | double tag_increment = increment * (uint64_t(dist_req_val) + cost); | |
259 | return std::max(time, prev + tag_increment); | |
7c673cae FG |
260 | } |
261 | } | |
262 | ||
263 | friend std::ostream& operator<<(std::ostream& out, | |
264 | const RequestTag& tag) { | |
265 | out << | |
266 | "{ RequestTag:: ready:" << (tag.ready ? "true" : "false") << | |
267 | " r:" << format_tag(tag.reservation) << | |
268 | " p:" << format_tag(tag.proportion) << | |
269 | " l:" << format_tag(tag.limit) << | |
270 | #if 0 // try to resolve this to make sure Time is operator<<'able. | |
7c673cae | 271 | " arrival:" << tag.arrival << |
7c673cae FG |
272 | #endif |
273 | " }"; | |
274 | return out; | |
275 | } | |
276 | }; // class RequestTag | |
277 | ||
11fdf7f2 TL |
278 | // C is client identifier type, R is request type, |
279 | // IsDelayed controls whether tag calculation is delayed until the request | |
280 | // reaches the front of its queue. This is an optimization over the | |
281 | // originally published dmclock algorithm, allowing it to use the most | |
282 | // recent values of rho and delta. | |
283 | // U1 determines whether to use client information function dynamically, | |
284 | // B is heap branching factor | |
9f95a23c | 285 | template<typename C, typename R, bool IsDelayed, bool U1, unsigned B> |
7c673cae | 286 | class PriorityQueueBase { |
d2e6a577 FG |
287 | // we don't want to include gtest.h just for FRIEND_TEST |
288 | friend class dmclock_server_client_idle_erase_Test; | |
20effc67 | 289 | friend class dmclock_server_add_req_pushprio_queue_Test; |
7c673cae | 290 | |
11fdf7f2 TL |
291 | // types used for tag dispatch to select between implementations |
292 | using TagCalc = std::integral_constant<bool, IsDelayed>; | |
293 | using DelayedTagCalc = std::true_type; | |
294 | using ImmediateTagCalc = std::false_type; | |
295 | ||
7c673cae FG |
296 | public: |
297 | ||
298 | using RequestRef = std::unique_ptr<R>; | |
299 | ||
300 | protected: | |
301 | ||
f67539c2 TL |
302 | using Clock = std::chrono::steady_clock; |
303 | using TimePoint = Clock::time_point; | |
7c673cae FG |
304 | using Duration = std::chrono::milliseconds; |
305 | using MarkPoint = std::pair<TimePoint,Counter>; | |
306 | ||
307 | enum class ReadyOption {ignore, lowers, raises}; | |
308 | ||
309 | // forward decl for friend decls | |
310 | template<double RequestTag::*, ReadyOption, bool> | |
311 | struct ClientCompare; | |
312 | ||
313 | class ClientReq { | |
314 | friend PriorityQueueBase; | |
315 | ||
316 | RequestTag tag; | |
317 | C client_id; | |
318 | RequestRef request; | |
319 | ||
320 | public: | |
321 | ||
322 | ClientReq(const RequestTag& _tag, | |
323 | const C& _client_id, | |
324 | RequestRef&& _request) : | |
325 | tag(_tag), | |
326 | client_id(_client_id), | |
327 | request(std::move(_request)) | |
328 | { | |
329 | // empty | |
330 | } | |
331 | ||
332 | friend std::ostream& operator<<(std::ostream& out, const ClientReq& c) { | |
333 | out << "{ ClientReq:: tag:" << c.tag << " client:" << | |
334 | c.client_id << " }"; | |
335 | return out; | |
336 | } | |
337 | }; // class ClientReq | |
338 | ||
11fdf7f2 TL |
339 | struct RequestMeta { |
340 | C client_id; | |
341 | RequestTag tag; | |
342 | ||
343 | RequestMeta(const C& _client_id, const RequestTag& _tag) : | |
344 | client_id(_client_id), | |
345 | tag(_tag) | |
346 | { | |
347 | // empty | |
348 | } | |
349 | }; | |
350 | ||
7c673cae FG |
351 | public: |
352 | ||
353 | // NOTE: ClientRec is in the "public" section for compatibility | |
354 | // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1 | |
355 | // ClientRec could be "protected" with no issue. [See comments | |
356 | // associated with function submit_top_request.] | |
357 | class ClientRec { | |
11fdf7f2 | 358 | friend PriorityQueueBase<C,R,IsDelayed,U1,B>; |
7c673cae FG |
359 | |
360 | C client; | |
361 | RequestTag prev_tag; | |
362 | std::deque<ClientReq> requests; | |
363 | ||
364 | // amount added from the proportion tag as a result of | |
365 | // an idle client becoming unidle | |
366 | double prop_delta = 0.0; | |
367 | ||
11fdf7f2 TL |
368 | c::IndIntruHeapData reserv_heap_data {}; |
369 | c::IndIntruHeapData lim_heap_data {}; | |
370 | c::IndIntruHeapData ready_heap_data {}; | |
7c673cae | 371 | #if USE_PROP_HEAP |
11fdf7f2 | 372 | c::IndIntruHeapData prop_heap_data {}; |
7c673cae FG |
373 | #endif |
374 | ||
375 | public: | |
376 | ||
11fdf7f2 | 377 | const ClientInfo* info; |
7c673cae FG |
378 | bool idle; |
379 | Counter last_tick; | |
380 | uint32_t cur_rho; | |
381 | uint32_t cur_delta; | |
382 | ||
383 | ClientRec(C _client, | |
11fdf7f2 | 384 | const ClientInfo* _info, |
7c673cae FG |
385 | Counter current_tick) : |
386 | client(_client), | |
387 | prev_tag(0.0, 0.0, 0.0, TimeZero), | |
388 | info(_info), | |
389 | idle(true), | |
390 | last_tick(current_tick), | |
391 | cur_rho(1), | |
392 | cur_delta(1) | |
393 | { | |
394 | // empty | |
395 | } | |
396 | ||
397 | inline const RequestTag& get_req_tag() const { | |
398 | return prev_tag; | |
399 | } | |
400 | ||
401 | static inline void assign_unpinned_tag(double& lhs, const double rhs) { | |
402 | if (rhs != max_tag && rhs != min_tag) { | |
403 | lhs = rhs; | |
404 | } | |
405 | } | |
406 | ||
407 | inline void update_req_tag(const RequestTag& _prev, | |
408 | const Counter& _tick) { | |
409 | assign_unpinned_tag(prev_tag.reservation, _prev.reservation); | |
410 | assign_unpinned_tag(prev_tag.limit, _prev.limit); | |
411 | assign_unpinned_tag(prev_tag.proportion, _prev.proportion); | |
11fdf7f2 | 412 | prev_tag.arrival = _prev.arrival; |
7c673cae FG |
413 | last_tick = _tick; |
414 | } | |
415 | ||
11fdf7f2 TL |
416 | inline void add_request(const RequestTag& tag, RequestRef&& request) { |
417 | requests.emplace_back(tag, client, std::move(request)); | |
7c673cae FG |
418 | } |
419 | ||
420 | inline const ClientReq& next_request() const { | |
421 | return requests.front(); | |
422 | } | |
423 | ||
424 | inline ClientReq& next_request() { | |
425 | return requests.front(); | |
426 | } | |
427 | ||
428 | inline void pop_request() { | |
429 | requests.pop_front(); | |
430 | } | |
431 | ||
432 | inline bool has_request() const { | |
433 | return !requests.empty(); | |
434 | } | |
435 | ||
436 | inline size_t request_count() const { | |
437 | return requests.size(); | |
438 | } | |
439 | ||
440 | // NB: because a deque is the underlying structure, this | |
441 | // operation might be expensive | |
11fdf7f2 | 442 | bool remove_by_req_filter_fw(std::function<bool(RequestRef&&)> filter_accum) { |
7c673cae FG |
443 | bool any_removed = false; |
444 | for (auto i = requests.begin(); | |
445 | i != requests.end(); | |
446 | /* no inc */) { | |
11fdf7f2 | 447 | if (filter_accum(std::move(i->request))) { |
7c673cae FG |
448 | any_removed = true; |
449 | i = requests.erase(i); | |
450 | } else { | |
451 | ++i; | |
452 | } | |
453 | } | |
454 | return any_removed; | |
455 | } | |
456 | ||
457 | // NB: because a deque is the underlying structure, this | |
458 | // operation might be expensive | |
11fdf7f2 | 459 | bool remove_by_req_filter_bw(std::function<bool(RequestRef&&)> filter_accum) { |
7c673cae FG |
460 | bool any_removed = false; |
461 | for (auto i = requests.rbegin(); | |
462 | i != requests.rend(); | |
463 | /* no inc */) { | |
11fdf7f2 | 464 | if (filter_accum(std::move(i->request))) { |
7c673cae FG |
465 | any_removed = true; |
466 | i = decltype(i){ requests.erase(std::next(i).base()) }; | |
467 | } else { | |
468 | ++i; | |
469 | } | |
470 | } | |
471 | return any_removed; | |
472 | } | |
473 | ||
474 | inline bool | |
11fdf7f2 | 475 | remove_by_req_filter(std::function<bool(RequestRef&&)> filter_accum, |
7c673cae FG |
476 | bool visit_backwards) { |
477 | if (visit_backwards) { | |
478 | return remove_by_req_filter_bw(filter_accum); | |
479 | } else { | |
480 | return remove_by_req_filter_fw(filter_accum); | |
481 | } | |
482 | } | |
483 | ||
484 | friend std::ostream& | |
485 | operator<<(std::ostream& out, | |
11fdf7f2 | 486 | const typename PriorityQueueBase::ClientRec& e) { |
7c673cae FG |
487 | out << "{ ClientRec::" << |
488 | " client:" << e.client << | |
489 | " prev_tag:" << e.prev_tag << | |
490 | " req_count:" << e.requests.size() << | |
491 | " top_req:"; | |
492 | if (e.has_request()) { | |
493 | out << e.next_request(); | |
494 | } else { | |
495 | out << "none"; | |
496 | } | |
497 | out << " }"; | |
498 | ||
499 | return out; | |
500 | } | |
501 | }; // class ClientRec | |
502 | ||
503 | using ClientRecRef = std::shared_ptr<ClientRec>; | |
504 | ||
505 | // when we try to get the next request, we'll be in one of three | |
506 | // situations -- we'll have one to return, have one that can | |
507 | // fire in the future, or not have any | |
508 | enum class NextReqType { returning, future, none }; | |
509 | ||
510 | // specifies which queue next request will get popped from | |
511 | enum class HeapId { reservation, ready }; | |
512 | ||
513 | // this is returned from next_req to tell the caller the situation | |
514 | struct NextReq { | |
515 | NextReqType type; | |
516 | union { | |
517 | HeapId heap_id; | |
518 | Time when_ready; | |
519 | }; | |
11fdf7f2 TL |
520 | |
521 | inline explicit NextReq() : | |
522 | type(NextReqType::none) | |
523 | { } | |
524 | ||
525 | inline NextReq(HeapId _heap_id) : | |
526 | type(NextReqType::returning), | |
527 | heap_id(_heap_id) | |
528 | { } | |
529 | ||
530 | inline NextReq(Time _when_ready) : | |
531 | type(NextReqType::future), | |
532 | when_ready(_when_ready) | |
533 | { } | |
534 | ||
535 | // calls to this are clearer than calls to the default | |
536 | // constructor | |
537 | static inline NextReq none() { | |
538 | return NextReq(); | |
539 | } | |
7c673cae FG |
540 | }; |
541 | ||
542 | ||
543 | // a function that can be called to look up client information | |
11fdf7f2 | 544 | using ClientInfoFunc = std::function<const ClientInfo*(const C&)>; |
7c673cae FG |
545 | |
546 | ||
547 | bool empty() const { | |
548 | DataGuard g(data_mtx); | |
549 | return (resv_heap.empty() || ! resv_heap.top().has_request()); | |
550 | } | |
551 | ||
552 | ||
553 | size_t client_count() const { | |
554 | DataGuard g(data_mtx); | |
555 | return resv_heap.size(); | |
556 | } | |
557 | ||
558 | ||
559 | size_t request_count() const { | |
560 | DataGuard g(data_mtx); | |
561 | size_t total = 0; | |
562 | for (auto i = resv_heap.cbegin(); i != resv_heap.cend(); ++i) { | |
563 | total += i->request_count(); | |
564 | } | |
565 | return total; | |
566 | } | |
567 | ||
568 | ||
11fdf7f2 | 569 | bool remove_by_req_filter(std::function<bool(RequestRef&&)> filter_accum, |
7c673cae FG |
570 | bool visit_backwards = false) { |
571 | bool any_removed = false; | |
572 | DataGuard g(data_mtx); | |
573 | for (auto i : client_map) { | |
574 | bool modified = | |
575 | i.second->remove_by_req_filter(filter_accum, visit_backwards); | |
576 | if (modified) { | |
577 | resv_heap.adjust(*i.second); | |
578 | limit_heap.adjust(*i.second); | |
579 | ready_heap.adjust(*i.second); | |
580 | #if USE_PROP_HEAP | |
581 | prop_heap.adjust(*i.second); | |
582 | #endif | |
583 | any_removed = true; | |
584 | } | |
585 | } | |
586 | return any_removed; | |
587 | } | |
588 | ||
589 | ||
590 | // use as a default value when no accumulator is provide | |
11fdf7f2 | 591 | static void request_sink(RequestRef&& req) { |
7c673cae FG |
592 | // do nothing |
593 | } | |
594 | ||
595 | ||
596 | void remove_by_client(const C& client, | |
597 | bool reverse = false, | |
11fdf7f2 | 598 | std::function<void (RequestRef&&)> accum = request_sink) { |
7c673cae FG |
599 | DataGuard g(data_mtx); |
600 | ||
601 | auto i = client_map.find(client); | |
602 | ||
603 | if (i == client_map.end()) return; | |
604 | ||
605 | if (reverse) { | |
606 | for (auto j = i->second->requests.rbegin(); | |
607 | j != i->second->requests.rend(); | |
608 | ++j) { | |
11fdf7f2 | 609 | accum(std::move(j->request)); |
7c673cae FG |
610 | } |
611 | } else { | |
612 | for (auto j = i->second->requests.begin(); | |
613 | j != i->second->requests.end(); | |
614 | ++j) { | |
11fdf7f2 | 615 | accum(std::move(j->request)); |
7c673cae FG |
616 | } |
617 | } | |
618 | ||
619 | i->second->requests.clear(); | |
620 | ||
621 | resv_heap.adjust(*i->second); | |
622 | limit_heap.adjust(*i->second); | |
623 | ready_heap.adjust(*i->second); | |
624 | #if USE_PROP_HEAP | |
625 | prop_heap.adjust(*i->second); | |
626 | #endif | |
627 | } | |
628 | ||
629 | ||
9f95a23c | 630 | unsigned get_heap_branching_factor() const { |
7c673cae FG |
631 | return B; |
632 | } | |
633 | ||
634 | ||
11fdf7f2 TL |
635 | void update_client_info(const C& client_id) { |
636 | DataGuard g(data_mtx); | |
637 | auto client_it = client_map.find(client_id); | |
638 | if (client_map.end() != client_it) { | |
639 | ClientRec& client = (*client_it->second); | |
640 | client.info = client_info_f(client_id); | |
641 | } | |
642 | } | |
643 | ||
644 | ||
645 | void update_client_infos() { | |
646 | DataGuard g(data_mtx); | |
647 | for (auto i : client_map) { | |
648 | i.second->info = client_info_f(i.second->client); | |
649 | } | |
650 | } | |
651 | ||
652 | ||
7c673cae FG |
653 | friend std::ostream& operator<<(std::ostream& out, |
654 | const PriorityQueueBase& q) { | |
655 | std::lock_guard<decltype(q.data_mtx)> guard(q.data_mtx); | |
656 | ||
657 | out << "{ PriorityQueue::"; | |
658 | for (const auto& c : q.client_map) { | |
659 | out << " { client:" << c.first << ", record:" << *c.second << | |
660 | " }"; | |
661 | } | |
662 | if (!q.resv_heap.empty()) { | |
663 | const auto& resv = q.resv_heap.top(); | |
664 | out << " { reservation_top:" << resv << " }"; | |
665 | const auto& ready = q.ready_heap.top(); | |
666 | out << " { ready_top:" << ready << " }"; | |
667 | const auto& limit = q.limit_heap.top(); | |
668 | out << " { limit_top:" << limit << " }"; | |
669 | } else { | |
670 | out << " HEAPS-EMPTY"; | |
671 | } | |
672 | out << " }"; | |
673 | ||
674 | return out; | |
675 | } | |
676 | ||
677 | // for debugging | |
678 | void display_queues(std::ostream& out, | |
679 | bool show_res = true, | |
680 | bool show_lim = true, | |
681 | bool show_ready = true, | |
682 | bool show_prop = true) const { | |
683 | auto filter = [](const ClientRec& e)->bool { return true; }; | |
684 | DataGuard g(data_mtx); | |
685 | if (show_res) { | |
686 | resv_heap.display_sorted(out << "RESER:", filter); | |
687 | } | |
688 | if (show_lim) { | |
689 | limit_heap.display_sorted(out << "LIMIT:", filter); | |
690 | } | |
691 | if (show_ready) { | |
692 | ready_heap.display_sorted(out << "READY:", filter); | |
693 | } | |
694 | #if USE_PROP_HEAP | |
695 | if (show_prop) { | |
696 | prop_heap.display_sorted(out << "PROPO:", filter); | |
697 | } | |
698 | #endif | |
699 | } // display_queues | |
700 | ||
701 | ||
702 | protected: | |
703 | ||
704 | // The ClientCompare functor is essentially doing a precedes? | |
705 | // operator, returning true if and only if the first parameter | |
706 | // must precede the second parameter. If the second must precede | |
707 | // the first, or if they are equivalent, false should be | |
708 | // returned. The reason for this behavior is that it will be | |
709 | // called to test if two items are out of order and if true is | |
710 | // returned it will reverse the items. Therefore false is the | |
711 | // default return when it doesn't matter to prevent unnecessary | |
712 | // re-ordering. | |
713 | // | |
714 | // The template is supporting variations in sorting based on the | |
715 | // heap in question and allowing these variations to be handled | |
716 | // at compile-time. | |
717 | // | |
718 | // tag_field determines which tag is being used for comparison | |
719 | // | |
720 | // ready_opt determines how the ready flag influences the sort | |
721 | // | |
722 | // use_prop_delta determines whether the proportional delta is | |
723 | // added in for comparison | |
724 | template<double RequestTag::*tag_field, | |
725 | ReadyOption ready_opt, | |
726 | bool use_prop_delta> | |
727 | struct ClientCompare { | |
728 | bool operator()(const ClientRec& n1, const ClientRec& n2) const { | |
729 | if (n1.has_request()) { | |
730 | if (n2.has_request()) { | |
731 | const auto& t1 = n1.next_request().tag; | |
732 | const auto& t2 = n2.next_request().tag; | |
733 | if (ReadyOption::ignore == ready_opt || t1.ready == t2.ready) { | |
734 | // if we don't care about ready or the ready values are the same | |
735 | if (use_prop_delta) { | |
736 | return (t1.*tag_field + n1.prop_delta) < | |
737 | (t2.*tag_field + n2.prop_delta); | |
738 | } else { | |
739 | return t1.*tag_field < t2.*tag_field; | |
740 | } | |
741 | } else if (ReadyOption::raises == ready_opt) { | |
742 | // use_ready == true && the ready fields are different | |
743 | return t1.ready; | |
744 | } else { | |
745 | return t2.ready; | |
746 | } | |
747 | } else { | |
748 | // n1 has request but n2 does not | |
749 | return true; | |
750 | } | |
751 | } else if (n2.has_request()) { | |
752 | // n2 has request but n1 does not | |
753 | return false; | |
754 | } else { | |
755 | // both have none; keep stable w false | |
756 | return false; | |
757 | } | |
758 | } | |
759 | }; | |
760 | ||
11fdf7f2 TL |
761 | ClientInfoFunc client_info_f; |
762 | static constexpr bool is_dynamic_cli_info_f = U1; | |
7c673cae | 763 | |
20effc67 TL |
764 | #ifdef WITH_SEASTAR |
765 | static constexpr int data_mtx = 0; | |
766 | struct DataGuard { DataGuard(int) {} }; | |
767 | #else | |
7c673cae FG |
768 | mutable std::mutex data_mtx; |
769 | using DataGuard = std::lock_guard<decltype(data_mtx)>; | |
20effc67 | 770 | #endif |
7c673cae FG |
771 | |
772 | // stable mapping between client ids and client queues | |
773 | std::map<C,ClientRecRef> client_map; | |
774 | ||
775 | c::IndIntruHeap<ClientRecRef, | |
776 | ClientRec, | |
777 | &ClientRec::reserv_heap_data, | |
778 | ClientCompare<&RequestTag::reservation, | |
779 | ReadyOption::ignore, | |
780 | false>, | |
781 | B> resv_heap; | |
782 | #if USE_PROP_HEAP | |
783 | c::IndIntruHeap<ClientRecRef, | |
784 | ClientRec, | |
785 | &ClientRec::prop_heap_data, | |
786 | ClientCompare<&RequestTag::proportion, | |
787 | ReadyOption::ignore, | |
788 | true>, | |
789 | B> prop_heap; | |
790 | #endif | |
791 | c::IndIntruHeap<ClientRecRef, | |
792 | ClientRec, | |
793 | &ClientRec::lim_heap_data, | |
794 | ClientCompare<&RequestTag::limit, | |
795 | ReadyOption::lowers, | |
796 | false>, | |
797 | B> limit_heap; | |
798 | c::IndIntruHeap<ClientRecRef, | |
799 | ClientRec, | |
800 | &ClientRec::ready_heap_data, | |
801 | ClientCompare<&RequestTag::proportion, | |
802 | ReadyOption::raises, | |
803 | true>, | |
804 | B> ready_heap; | |
805 | ||
11fdf7f2 TL |
806 | AtLimit at_limit; |
807 | RejectThreshold reject_threshold = 0; | |
808 | ||
809 | double anticipation_timeout; | |
20effc67 TL |
810 | #ifdef WITH_SEASTAR |
811 | bool finishing; | |
812 | #else | |
7c673cae | 813 | std::atomic_bool finishing; |
20effc67 | 814 | #endif |
7c673cae FG |
815 | // every request creates a tick |
816 | Counter tick = 0; | |
817 | ||
818 | // performance data collection | |
819 | size_t reserv_sched_count = 0; | |
820 | size_t prop_sched_count = 0; | |
821 | size_t limit_break_sched_count = 0; | |
822 | ||
823 | Duration idle_age; | |
824 | Duration erase_age; | |
825 | Duration check_time; | |
826 | std::deque<MarkPoint> clean_mark_points; | |
11fdf7f2 TL |
827 | // max number of clients to erase at a time |
828 | Counter erase_max; | |
829 | // unfinished last erase point | |
830 | Counter last_erase_point = 0; | |
7c673cae FG |
831 | |
832 | // NB: All threads declared at end, so they're destructed first! | |
833 | ||
834 | std::unique_ptr<RunEvery> cleaning_job; | |
835 | ||
11fdf7f2 TL |
836 | // helper function to return the value of a variant if it matches the |
837 | // given type T, or a default value of T otherwise | |
838 | template <typename T, typename Variant> | |
839 | static T get_or_default(const Variant& param, T default_value) { | |
840 | const T *p = boost::get<T>(¶m); | |
841 | return p ? *p : default_value; | |
842 | } | |
7c673cae FG |
843 | |
844 | // COMMON constructor that others feed into; we can accept three | |
845 | // different variations of durations | |
846 | template<typename Rep, typename Per> | |
847 | PriorityQueueBase(ClientInfoFunc _client_info_f, | |
848 | std::chrono::duration<Rep,Per> _idle_age, | |
849 | std::chrono::duration<Rep,Per> _erase_age, | |
850 | std::chrono::duration<Rep,Per> _check_time, | |
11fdf7f2 TL |
851 | AtLimitParam at_limit_param, |
852 | double _anticipation_timeout) : | |
7c673cae | 853 | client_info_f(_client_info_f), |
11fdf7f2 TL |
854 | at_limit(get_or_default(at_limit_param, AtLimit::Reject)), |
855 | reject_threshold(get_or_default(at_limit_param, RejectThreshold{0})), | |
856 | anticipation_timeout(_anticipation_timeout), | |
7c673cae FG |
857 | finishing(false), |
858 | idle_age(std::chrono::duration_cast<Duration>(_idle_age)), | |
859 | erase_age(std::chrono::duration_cast<Duration>(_erase_age)), | |
11fdf7f2 TL |
860 | check_time(std::chrono::duration_cast<Duration>(_check_time)), |
861 | erase_max(standard_erase_max) | |
7c673cae FG |
862 | { |
863 | assert(_erase_age >= _idle_age); | |
864 | assert(_check_time < _idle_age); | |
11fdf7f2 TL |
865 | // AtLimit::Reject depends on ImmediateTagCalc |
866 | assert(at_limit != AtLimit::Reject || !IsDelayed); | |
7c673cae FG |
867 | cleaning_job = |
868 | std::unique_ptr<RunEvery>( | |
869 | new RunEvery(check_time, | |
870 | std::bind(&PriorityQueueBase::do_clean, this))); | |
871 | } | |
872 | ||
873 | ||
874 | ~PriorityQueueBase() { | |
875 | finishing = true; | |
876 | } | |
877 | ||
878 | ||
11fdf7f2 TL |
879 | inline const ClientInfo* get_cli_info(ClientRec& client) const { |
880 | if (is_dynamic_cli_info_f) { | |
881 | client.info = client_info_f(client.client); | |
882 | } | |
883 | return client.info; | |
884 | } | |
885 | ||
7c673cae | 886 | // data_mtx must be held by caller |
11fdf7f2 TL |
887 | RequestTag initial_tag(DelayedTagCalc delayed, ClientRec& client, |
888 | const ReqParams& params, Time time, Cost cost) { | |
889 | RequestTag tag(0, 0, 0, time, 0, 0, cost); | |
7c673cae | 890 | |
11fdf7f2 TL |
891 | // only calculate a tag if the request is going straight to the front |
892 | if (!client.has_request()) { | |
893 | const ClientInfo* client_info = get_cli_info(client); | |
894 | assert(client_info); | |
895 | tag = RequestTag(client.get_req_tag(), *client_info, | |
896 | params, time, cost, anticipation_timeout); | |
7c673cae | 897 | |
11fdf7f2 TL |
898 | // copy tag to previous tag for client |
899 | client.update_req_tag(tag, tick); | |
900 | } | |
901 | return tag; | |
902 | } | |
903 | ||
904 | // data_mtx must be held by caller | |
905 | RequestTag initial_tag(ImmediateTagCalc imm, ClientRec& client, | |
906 | const ReqParams& params, Time time, Cost cost) { | |
907 | // calculate the tag unconditionally | |
908 | const ClientInfo* client_info = get_cli_info(client); | |
909 | assert(client_info); | |
910 | RequestTag tag(client.get_req_tag(), *client_info, | |
911 | params, time, cost, anticipation_timeout); | |
912 | ||
913 | // copy tag to previous tag for client | |
914 | client.update_req_tag(tag, tick); | |
915 | return tag; | |
916 | } | |
917 | ||
918 | // data_mtx must be held by caller. returns 0 on success. when using | |
919 | // AtLimit::Reject, requests that would exceed their limit are rejected | |
920 | // with EAGAIN, and the queue will not take ownership of the given | |
921 | // 'request' argument | |
922 | int do_add_request(RequestRef&& request, | |
923 | const C& client_id, | |
924 | const ReqParams& req_params, | |
925 | const Time time, | |
926 | const Cost cost = 1u) { | |
927 | ++tick; | |
928 | ||
929 | auto insert = client_map.emplace(client_id, ClientRecRef{}); | |
930 | if (insert.second) { | |
931 | // new client entry | |
932 | const ClientInfo* info = client_info_f(client_id); | |
933 | auto client_rec = std::make_shared<ClientRec>(client_id, info, tick); | |
7c673cae FG |
934 | resv_heap.push(client_rec); |
935 | #if USE_PROP_HEAP | |
936 | prop_heap.push(client_rec); | |
937 | #endif | |
938 | limit_heap.push(client_rec); | |
939 | ready_heap.push(client_rec); | |
11fdf7f2 | 940 | insert.first->second = std::move(client_rec); |
7c673cae FG |
941 | } |
942 | ||
943 | // for convenience, we'll create a reference to the shared pointer | |
11fdf7f2 | 944 | ClientRec& client = *insert.first->second; |
7c673cae FG |
945 | |
946 | if (client.idle) { | |
947 | // We need to do an adjustment so that idle clients compete | |
948 | // fairly on proportional tags since those tags may have | |
949 | // drifted from real-time. Either use the lowest existing | |
950 | // proportion tag -- O(1) -- or the client with the lowest | |
951 | // previous proportion tag -- O(n) where n = # clients. | |
952 | // | |
f67539c2 | 953 | // So we don't have to maintain a proportional queue that |
7c673cae FG |
954 | // keeps the minimum on proportional tag alone (we're |
955 | // instead using a ready queue), we'll have to check each | |
956 | // client. | |
957 | // | |
958 | // The alternative would be to maintain a proportional queue | |
959 | // (define USE_PROP_TAG) and do an O(1) operation here. | |
960 | ||
961 | // Was unable to confirm whether equality testing on | |
962 | // std::numeric_limits<double>::max() is guaranteed, so | |
963 | // we'll use a compile-time calculated trigger that is one | |
964 | // third the max, which should be much larger than any | |
965 | // expected organic value. | |
966 | constexpr double lowest_prop_tag_trigger = | |
967 | std::numeric_limits<double>::max() / 3.0; | |
968 | ||
969 | double lowest_prop_tag = std::numeric_limits<double>::max(); | |
970 | for (auto const &c : client_map) { | |
971 | // don't use ourselves (or anything else that might be | |
972 | // listed as idle) since we're now in the map | |
973 | if (!c.second->idle) { | |
974 | double p; | |
975 | // use either lowest proportion tag or previous proportion tag | |
976 | if (c.second->has_request()) { | |
977 | p = c.second->next_request().tag.proportion + | |
978 | c.second->prop_delta; | |
979 | } else { | |
980 | p = c.second->get_req_tag().proportion + c.second->prop_delta; | |
981 | } | |
982 | ||
983 | if (p < lowest_prop_tag) { | |
984 | lowest_prop_tag = p; | |
985 | } | |
986 | } | |
987 | } | |
988 | ||
989 | // if this conditional does not fire, it | |
990 | if (lowest_prop_tag < lowest_prop_tag_trigger) { | |
991 | client.prop_delta = lowest_prop_tag - time; | |
992 | } | |
993 | client.idle = false; | |
994 | } // if this client was idle | |
995 | ||
11fdf7f2 | 996 | RequestTag tag = initial_tag(TagCalc{}, client, req_params, time, cost); |
7c673cae | 997 | |
11fdf7f2 TL |
998 | if (at_limit == AtLimit::Reject && |
999 | tag.limit > time + reject_threshold) { | |
1000 | // if the client is over its limit, reject it here | |
1001 | return EAGAIN; | |
7c673cae | 1002 | } |
7c673cae | 1003 | |
11fdf7f2 | 1004 | client.add_request(tag, std::move(request)); |
7c673cae FG |
1005 | if (1 == client.requests.size()) { |
1006 | // NB: can the following 4 calls to adjust be changed | |
1007 | // promote? Can adding a request ever demote a client in the | |
1008 | // heaps? | |
1009 | resv_heap.adjust(client); | |
1010 | limit_heap.adjust(client); | |
1011 | ready_heap.adjust(client); | |
1012 | #if USE_PROP_HEAP | |
1013 | prop_heap.adjust(client); | |
1014 | #endif | |
1015 | } | |
1016 | ||
1017 | client.cur_rho = req_params.rho; | |
1018 | client.cur_delta = req_params.delta; | |
1019 | ||
1020 | resv_heap.adjust(client); | |
1021 | limit_heap.adjust(client); | |
1022 | ready_heap.adjust(client); | |
1023 | #if USE_PROP_HEAP | |
1024 | prop_heap.adjust(client); | |
1025 | #endif | |
11fdf7f2 TL |
1026 | return 0; |
1027 | } // do_add_request | |
1028 | ||
1029 | // data_mtx must be held by caller | |
1030 | void update_next_tag(DelayedTagCalc delayed, ClientRec& top, | |
1031 | const RequestTag& tag) { | |
1032 | if (top.has_request()) { | |
1033 | // perform delayed tag calculation on the next request | |
1034 | ClientReq& next_first = top.next_request(); | |
1035 | const ClientInfo* client_info = get_cli_info(top); | |
1036 | assert(client_info); | |
1037 | next_first.tag = RequestTag(tag, *client_info, | |
1038 | top.cur_delta, top.cur_rho, | |
1039 | next_first.tag.arrival, | |
1040 | next_first.tag.cost, | |
1041 | anticipation_timeout); | |
1042 | // copy tag to previous tag for client | |
1043 | top.update_req_tag(next_first.tag, tick); | |
1044 | } | |
1045 | } | |
7c673cae | 1046 | |
11fdf7f2 TL |
1047 | void update_next_tag(ImmediateTagCalc imm, ClientRec& top, |
1048 | const RequestTag& tag) { | |
1049 | // the next tag was already calculated on insertion | |
1050 | } | |
7c673cae FG |
1051 | |
1052 | // data_mtx should be held when called; top of heap should have | |
1053 | // a ready request | |
1054 | template<typename C1, IndIntruHeapData ClientRec::*C2, typename C3> | |
11fdf7f2 | 1055 | RequestTag pop_process_request(IndIntruHeap<C1, ClientRec, C2, C3, B>& heap, |
7c673cae | 1056 | std::function<void(const C& client, |
11fdf7f2 | 1057 | const Cost cost, |
7c673cae FG |
1058 | RequestRef& request)> process) { |
1059 | // gain access to data | |
1060 | ClientRec& top = heap.top(); | |
31f18b77 | 1061 | |
11fdf7f2 | 1062 | Cost request_cost = top.next_request().tag.cost; |
31f18b77 FG |
1063 | RequestRef request = std::move(top.next_request().request); |
1064 | RequestTag tag = top.next_request().tag; | |
7c673cae FG |
1065 | |
1066 | // pop request and adjust heaps | |
1067 | top.pop_request(); | |
1068 | ||
11fdf7f2 | 1069 | update_next_tag(TagCalc{}, top, tag); |
7c673cae FG |
1070 | |
1071 | resv_heap.demote(top); | |
1072 | limit_heap.adjust(top); | |
1073 | #if USE_PROP_HEAP | |
1074 | prop_heap.demote(top); | |
1075 | #endif | |
1076 | ready_heap.demote(top); | |
1077 | ||
1078 | // process | |
11fdf7f2 TL |
1079 | process(top.client, request_cost, request); |
1080 | ||
1081 | return tag; | |
7c673cae FG |
1082 | } // pop_process_request |
1083 | ||
1084 | ||
11fdf7f2 TL |
1085 | // data_mtx must be held by caller |
1086 | void reduce_reservation_tags(DelayedTagCalc delayed, ClientRec& client, | |
1087 | const RequestTag& tag) { | |
1088 | if (!client.requests.empty()) { | |
1089 | // only maintain a tag for the first request | |
1090 | auto& r = client.requests.front(); | |
1091 | r.tag.reservation -= | |
20effc67 | 1092 | client.info->reservation_inv * (tag.cost + tag.rho); |
11fdf7f2 TL |
1093 | } |
1094 | } | |
1095 | ||
7c673cae | 1096 | // data_mtx should be held when called |
11fdf7f2 TL |
1097 | void reduce_reservation_tags(ImmediateTagCalc imm, ClientRec& client, |
1098 | const RequestTag& tag) { | |
1099 | double res_offset = | |
20effc67 | 1100 | client.info->reservation_inv * (tag.cost + tag.rho); |
7c673cae | 1101 | for (auto& r : client.requests) { |
11fdf7f2 | 1102 | r.tag.reservation -= res_offset; |
7c673cae | 1103 | } |
7c673cae FG |
1104 | } |
1105 | ||
7c673cae | 1106 | // data_mtx should be held when called |
11fdf7f2 | 1107 | void reduce_reservation_tags(const C& client_id, const RequestTag& tag) { |
7c673cae FG |
1108 | auto client_it = client_map.find(client_id); |
1109 | ||
1110 | // means the client was cleaned from map; should never happen | |
1111 | // as long as cleaning times are long enough | |
1112 | assert(client_map.end() != client_it); | |
11fdf7f2 TL |
1113 | ClientRec& client = *client_it->second; |
1114 | reduce_reservation_tags(TagCalc{}, client, tag); | |
1115 | ||
1116 | // don't forget to update previous tag | |
1117 | client.prev_tag.reservation -= | |
20effc67 | 1118 | client.info->reservation_inv * (tag.cost + tag.rho); |
11fdf7f2 | 1119 | resv_heap.promote(client); |
7c673cae FG |
1120 | } |
1121 | ||
1122 | ||
1123 | // data_mtx should be held when called | |
1124 | NextReq do_next_request(Time now) { | |
11fdf7f2 TL |
1125 | // if reservation queue is empty, all are empty (i.e., no |
1126 | // active clients) | |
7c673cae | 1127 | if(resv_heap.empty()) { |
11fdf7f2 | 1128 | return NextReq::none(); |
7c673cae FG |
1129 | } |
1130 | ||
1131 | // try constraint (reservation) based scheduling | |
1132 | ||
1133 | auto& reserv = resv_heap.top(); | |
1134 | if (reserv.has_request() && | |
1135 | reserv.next_request().tag.reservation <= now) { | |
11fdf7f2 | 1136 | return NextReq(HeapId::reservation); |
7c673cae FG |
1137 | } |
1138 | ||
1139 | // no existing reservations before now, so try weight-based | |
1140 | // scheduling | |
1141 | ||
1142 | // all items that are within limit are eligible based on | |
1143 | // priority | |
1144 | auto limits = &limit_heap.top(); | |
1145 | while (limits->has_request() && | |
1146 | !limits->next_request().tag.ready && | |
1147 | limits->next_request().tag.limit <= now) { | |
1148 | limits->next_request().tag.ready = true; | |
1149 | ready_heap.promote(*limits); | |
1150 | limit_heap.demote(*limits); | |
1151 | ||
1152 | limits = &limit_heap.top(); | |
1153 | } | |
1154 | ||
1155 | auto& readys = ready_heap.top(); | |
1156 | if (readys.has_request() && | |
1157 | readys.next_request().tag.ready && | |
1158 | readys.next_request().tag.proportion < max_tag) { | |
11fdf7f2 | 1159 | return NextReq(HeapId::ready); |
7c673cae FG |
1160 | } |
1161 | ||
1162 | // if nothing is schedulable by reservation or | |
1163 | // proportion/weight, and if we allow limit break, try to | |
1164 | // schedule something with the lowest proportion tag or | |
1165 | // alternatively lowest reservation tag. | |
11fdf7f2 | 1166 | if (at_limit == AtLimit::Allow) { |
7c673cae FG |
1167 | if (readys.has_request() && |
1168 | readys.next_request().tag.proportion < max_tag) { | |
11fdf7f2 | 1169 | return NextReq(HeapId::ready); |
7c673cae FG |
1170 | } else if (reserv.has_request() && |
1171 | reserv.next_request().tag.reservation < max_tag) { | |
11fdf7f2 | 1172 | return NextReq(HeapId::reservation); |
7c673cae FG |
1173 | } |
1174 | } | |
1175 | ||
1176 | // nothing scheduled; make sure we re-run when next | |
1177 | // reservation item or next limited item comes up | |
1178 | ||
1179 | Time next_call = TimeMax; | |
1180 | if (resv_heap.top().has_request()) { | |
1181 | next_call = | |
1182 | min_not_0_time(next_call, | |
1183 | resv_heap.top().next_request().tag.reservation); | |
1184 | } | |
1185 | if (limit_heap.top().has_request()) { | |
1186 | const auto& next = limit_heap.top().next_request(); | |
1187 | assert(!next.tag.ready || max_tag == next.tag.proportion); | |
1188 | next_call = min_not_0_time(next_call, next.tag.limit); | |
1189 | } | |
1190 | if (next_call < TimeMax) { | |
11fdf7f2 | 1191 | return NextReq(next_call); |
7c673cae | 1192 | } else { |
11fdf7f2 | 1193 | return NextReq::none(); |
7c673cae FG |
1194 | } |
1195 | } // do_next_request | |
1196 | ||
1197 | ||
1198 | // if possible is not zero and less than current then return it; | |
1199 | // otherwise return current; the idea is we're trying to find | |
1200 | // the minimal time but ignoring zero | |
1201 | static inline const Time& min_not_0_time(const Time& current, | |
1202 | const Time& possible) { | |
1203 | return TimeZero == possible ? current : std::min(current, possible); | |
1204 | } | |
1205 | ||
1206 | ||
1207 | /* | |
1208 | * This is being called regularly by RunEvery. Every time it's | |
1209 | * called it notes the time and delta counter (mark point) in a | |
1210 | * deque. It also looks at the deque to find the most recent | |
1211 | * mark point that is older than clean_age. It then walks the | |
1212 | * map and delete all server entries that were last used before | |
1213 | * that mark point. | |
1214 | */ | |
1215 | void do_clean() { | |
1216 | TimePoint now = std::chrono::steady_clock::now(); | |
1217 | DataGuard g(data_mtx); | |
1218 | clean_mark_points.emplace_back(MarkPoint(now, tick)); | |
1219 | ||
1220 | // first erase the super-old client records | |
1221 | ||
11fdf7f2 | 1222 | Counter erase_point = last_erase_point; |
7c673cae FG |
1223 | auto point = clean_mark_points.front(); |
1224 | while (point.first <= now - erase_age) { | |
11fdf7f2 TL |
1225 | last_erase_point = point.second; |
1226 | erase_point = last_erase_point; | |
7c673cae FG |
1227 | clean_mark_points.pop_front(); |
1228 | point = clean_mark_points.front(); | |
1229 | } | |
1230 | ||
1231 | Counter idle_point = 0; | |
1232 | for (auto i : clean_mark_points) { | |
1233 | if (i.first <= now - idle_age) { | |
1234 | idle_point = i.second; | |
1235 | } else { | |
1236 | break; | |
1237 | } | |
1238 | } | |
1239 | ||
11fdf7f2 | 1240 | Counter erased_num = 0; |
7c673cae FG |
1241 | if (erase_point > 0 || idle_point > 0) { |
1242 | for (auto i = client_map.begin(); i != client_map.end(); /* empty */) { | |
1243 | auto i2 = i++; | |
11fdf7f2 TL |
1244 | if (erase_point && |
1245 | erased_num < erase_max && | |
1246 | i2->second->last_tick <= erase_point) { | |
7c673cae FG |
1247 | delete_from_heaps(i2->second); |
1248 | client_map.erase(i2); | |
11fdf7f2 | 1249 | erased_num++; |
7c673cae FG |
1250 | } else if (idle_point && i2->second->last_tick <= idle_point) { |
1251 | i2->second->idle = true; | |
1252 | } | |
1253 | } // for | |
11fdf7f2 TL |
1254 | |
1255 | auto wperiod = check_time; | |
1256 | if (erased_num >= erase_max) { | |
1257 | wperiod = duration_cast<milliseconds>(aggressive_check_time); | |
1258 | } else { | |
1259 | // clean finished, refresh | |
1260 | last_erase_point = 0; | |
1261 | } | |
1262 | cleaning_job->try_update(wperiod); | |
7c673cae FG |
1263 | } // if |
1264 | } // do_clean | |
1265 | ||
1266 | ||
1267 | // data_mtx must be held by caller | |
1268 | template<IndIntruHeapData ClientRec::*C1,typename C2> | |
1269 | void delete_from_heap(ClientRecRef& client, | |
1270 | c::IndIntruHeap<ClientRecRef,ClientRec,C1,C2,B>& heap) { | |
11fdf7f2 | 1271 | auto i = heap.at(client); |
7c673cae FG |
1272 | heap.remove(i); |
1273 | } | |
1274 | ||
1275 | ||
1276 | // data_mtx must be held by caller | |
1277 | void delete_from_heaps(ClientRecRef& client) { | |
1278 | delete_from_heap(client, resv_heap); | |
1279 | #if USE_PROP_HEAP | |
1280 | delete_from_heap(client, prop_heap); | |
1281 | #endif | |
1282 | delete_from_heap(client, limit_heap); | |
1283 | delete_from_heap(client, ready_heap); | |
1284 | } | |
1285 | }; // class PriorityQueueBase | |
1286 | ||
1287 | ||
9f95a23c | 1288 | template<typename C, typename R, bool IsDelayed=false, bool U1=false, unsigned B=2> |
11fdf7f2 TL |
1289 | class PullPriorityQueue : public PriorityQueueBase<C,R,IsDelayed,U1,B> { |
1290 | using super = PriorityQueueBase<C,R,IsDelayed,U1,B>; | |
7c673cae FG |
1291 | |
1292 | public: | |
1293 | ||
1294 | // When a request is pulled, this is the return type. | |
1295 | struct PullReq { | |
1296 | struct Retn { | |
11fdf7f2 TL |
1297 | C client; |
1298 | typename super::RequestRef request; | |
1299 | PhaseType phase; | |
1300 | Cost cost; | |
7c673cae FG |
1301 | }; |
1302 | ||
1303 | typename super::NextReqType type; | |
1304 | boost::variant<Retn,Time> data; | |
1305 | ||
1306 | bool is_none() const { return type == super::NextReqType::none; } | |
1307 | ||
1308 | bool is_retn() const { return type == super::NextReqType::returning; } | |
1309 | Retn& get_retn() { | |
1310 | return boost::get<Retn>(data); | |
1311 | } | |
1312 | ||
1313 | bool is_future() const { return type == super::NextReqType::future; } | |
1314 | Time getTime() const { return boost::get<Time>(data); } | |
1315 | }; | |
1316 | ||
1317 | ||
1318 | #ifdef PROFILE | |
1319 | ProfileTimer<std::chrono::nanoseconds> pull_request_timer; | |
1320 | ProfileTimer<std::chrono::nanoseconds> add_request_timer; | |
1321 | #endif | |
1322 | ||
1323 | template<typename Rep, typename Per> | |
1324 | PullPriorityQueue(typename super::ClientInfoFunc _client_info_f, | |
1325 | std::chrono::duration<Rep,Per> _idle_age, | |
1326 | std::chrono::duration<Rep,Per> _erase_age, | |
1327 | std::chrono::duration<Rep,Per> _check_time, | |
11fdf7f2 TL |
1328 | AtLimitParam at_limit_param = AtLimit::Wait, |
1329 | double _anticipation_timeout = 0.0) : | |
7c673cae FG |
1330 | super(_client_info_f, |
1331 | _idle_age, _erase_age, _check_time, | |
11fdf7f2 | 1332 | at_limit_param, _anticipation_timeout) |
7c673cae FG |
1333 | { |
1334 | // empty | |
1335 | } | |
1336 | ||
1337 | ||
1338 | // pull convenience constructor | |
1339 | PullPriorityQueue(typename super::ClientInfoFunc _client_info_f, | |
11fdf7f2 TL |
1340 | AtLimitParam at_limit_param = AtLimit::Wait, |
1341 | double _anticipation_timeout = 0.0) : | |
7c673cae | 1342 | PullPriorityQueue(_client_info_f, |
11fdf7f2 TL |
1343 | standard_idle_age, |
1344 | standard_erase_age, | |
1345 | standard_check_time, | |
1346 | at_limit_param, | |
1347 | _anticipation_timeout) | |
7c673cae FG |
1348 | { |
1349 | // empty | |
1350 | } | |
1351 | ||
1352 | ||
11fdf7f2 TL |
1353 | int add_request(R&& request, |
1354 | const C& client_id, | |
1355 | const ReqParams& req_params, | |
1356 | const Cost cost = 1u) { | |
1357 | return add_request(typename super::RequestRef(new R(std::move(request))), | |
1358 | client_id, | |
1359 | req_params, | |
1360 | get_time(), | |
1361 | cost); | |
7c673cae FG |
1362 | } |
1363 | ||
1364 | ||
11fdf7f2 TL |
1365 | int add_request(R&& request, |
1366 | const C& client_id, | |
1367 | const Cost cost = 1u) { | |
7c673cae | 1368 | static const ReqParams null_req_params; |
11fdf7f2 TL |
1369 | return add_request(typename super::RequestRef(new R(std::move(request))), |
1370 | client_id, | |
1371 | null_req_params, | |
1372 | get_time(), | |
1373 | cost); | |
7c673cae FG |
1374 | } |
1375 | ||
1376 | ||
11fdf7f2 TL |
1377 | int add_request_time(R&& request, |
1378 | const C& client_id, | |
1379 | const ReqParams& req_params, | |
1380 | const Time time, | |
1381 | const Cost cost = 1u) { | |
1382 | return add_request(typename super::RequestRef(new R(std::move(request))), | |
1383 | client_id, | |
1384 | req_params, | |
1385 | time, | |
1386 | cost); | |
7c673cae FG |
1387 | } |
1388 | ||
1389 | ||
11fdf7f2 TL |
1390 | int add_request(typename super::RequestRef&& request, |
1391 | const C& client_id, | |
1392 | const ReqParams& req_params, | |
1393 | const Cost cost = 1u) { | |
20effc67 | 1394 | return add_request(std::move(request), client_id, req_params, get_time(), cost); |
7c673cae FG |
1395 | } |
1396 | ||
1397 | ||
11fdf7f2 TL |
1398 | int add_request(typename super::RequestRef&& request, |
1399 | const C& client_id, | |
1400 | const Cost cost = 1u) { | |
7c673cae | 1401 | static const ReqParams null_req_params; |
20effc67 | 1402 | return add_request(std::move(request), client_id, null_req_params, get_time(), cost); |
7c673cae FG |
1403 | } |
1404 | ||
1405 | ||
1406 | // this does the work; the versions above provide alternate interfaces | |
11fdf7f2 TL |
1407 | int add_request(typename super::RequestRef&& request, |
1408 | const C& client_id, | |
1409 | const ReqParams& req_params, | |
1410 | const Time time, | |
1411 | const Cost cost = 1u) { | |
7c673cae FG |
1412 | typename super::DataGuard g(this->data_mtx); |
1413 | #ifdef PROFILE | |
1414 | add_request_timer.start(); | |
1415 | #endif | |
11fdf7f2 TL |
1416 | int r = super::do_add_request(std::move(request), |
1417 | client_id, | |
1418 | req_params, | |
1419 | time, | |
1420 | cost); | |
7c673cae FG |
1421 | // no call to schedule_request for pull version |
1422 | #ifdef PROFILE | |
1423 | add_request_timer.stop(); | |
1424 | #endif | |
11fdf7f2 | 1425 | return r; |
7c673cae FG |
1426 | } |
1427 | ||
1428 | ||
1429 | inline PullReq pull_request() { | |
1430 | return pull_request(get_time()); | |
1431 | } | |
1432 | ||
1433 | ||
11fdf7f2 | 1434 | PullReq pull_request(const Time now) { |
7c673cae FG |
1435 | PullReq result; |
1436 | typename super::DataGuard g(this->data_mtx); | |
1437 | #ifdef PROFILE | |
1438 | pull_request_timer.start(); | |
1439 | #endif | |
1440 | ||
1441 | typename super::NextReq next = super::do_next_request(now); | |
1442 | result.type = next.type; | |
1443 | switch(next.type) { | |
1444 | case super::NextReqType::none: | |
1445 | return result; | |
7c673cae FG |
1446 | case super::NextReqType::future: |
1447 | result.data = next.when_ready; | |
1448 | return result; | |
7c673cae FG |
1449 | case super::NextReqType::returning: |
1450 | // to avoid nesting, break out and let code below handle this case | |
1451 | break; | |
1452 | default: | |
1453 | assert(false); | |
1454 | } | |
1455 | ||
1456 | // we'll only get here if we're returning an entry | |
1457 | ||
1458 | auto process_f = | |
1459 | [&] (PullReq& pull_result, PhaseType phase) -> | |
1460 | std::function<void(const C&, | |
11fdf7f2 | 1461 | uint64_t, |
7c673cae FG |
1462 | typename super::RequestRef&)> { |
1463 | return [&pull_result, phase](const C& client, | |
11fdf7f2 | 1464 | const Cost request_cost, |
7c673cae | 1465 | typename super::RequestRef& request) { |
11fdf7f2 TL |
1466 | pull_result.data = typename PullReq::Retn{ client, |
1467 | std::move(request), | |
1468 | phase, | |
1469 | request_cost }; | |
7c673cae FG |
1470 | }; |
1471 | }; | |
1472 | ||
1473 | switch(next.heap_id) { | |
1474 | case super::HeapId::reservation: | |
11fdf7f2 TL |
1475 | (void) super::pop_process_request(this->resv_heap, |
1476 | process_f(result, | |
1477 | PhaseType::reservation)); | |
7c673cae FG |
1478 | ++this->reserv_sched_count; |
1479 | break; | |
1480 | case super::HeapId::ready: | |
11fdf7f2 TL |
1481 | { |
1482 | auto tag = super::pop_process_request(this->ready_heap, | |
7c673cae | 1483 | process_f(result, PhaseType::priority)); |
11fdf7f2 | 1484 | // need to use retn temporarily |
7c673cae | 1485 | auto& retn = boost::get<typename PullReq::Retn>(result.data); |
11fdf7f2 | 1486 | super::reduce_reservation_tags(retn.client, tag); |
7c673cae FG |
1487 | } |
1488 | ++this->prop_sched_count; | |
1489 | break; | |
1490 | default: | |
1491 | assert(false); | |
1492 | } | |
1493 | ||
1494 | #ifdef PROFILE | |
1495 | pull_request_timer.stop(); | |
1496 | #endif | |
1497 | return result; | |
1498 | } // pull_request | |
1499 | ||
1500 | ||
1501 | protected: | |
1502 | ||
1503 | ||
1504 | // data_mtx should be held when called; unfortunately this | |
1505 | // function has to be repeated in both push & pull | |
1506 | // specializations | |
1507 | typename super::NextReq next_request() { | |
1508 | return next_request(get_time()); | |
1509 | } | |
1510 | }; // class PullPriorityQueue | |
1511 | ||
20effc67 TL |
1512 | #ifndef WITH_SEASTAR |
1513 | // TODO: PushPriorityQueue is not ported to seastar yet | |
7c673cae | 1514 | // PUSH version |
9f95a23c | 1515 | template<typename C, typename R, bool IsDelayed=false, bool U1=false, unsigned B=2> |
11fdf7f2 | 1516 | class PushPriorityQueue : public PriorityQueueBase<C,R,IsDelayed,U1,B> { |
7c673cae FG |
1517 | |
1518 | protected: | |
1519 | ||
11fdf7f2 | 1520 | using super = PriorityQueueBase<C,R,IsDelayed,U1,B>; |
7c673cae FG |
1521 | |
1522 | public: | |
1523 | ||
1524 | // a function to see whether the server can handle another request | |
1525 | using CanHandleRequestFunc = std::function<bool(void)>; | |
1526 | ||
1527 | // a function to submit a request to the server; the second | |
1528 | // parameter is a callback when it's completed | |
1529 | using HandleRequestFunc = | |
11fdf7f2 | 1530 | std::function<void(const C&,typename super::RequestRef,PhaseType,uint64_t)>; |
7c673cae FG |
1531 | |
1532 | protected: | |
1533 | ||
1534 | CanHandleRequestFunc can_handle_f; | |
1535 | HandleRequestFunc handle_f; | |
1536 | // for handling timed scheduling | |
1537 | std::mutex sched_ahead_mtx; | |
1538 | std::condition_variable sched_ahead_cv; | |
1539 | Time sched_ahead_when = TimeZero; | |
1540 | ||
1541 | #ifdef PROFILE | |
1542 | public: | |
1543 | ProfileTimer<std::chrono::nanoseconds> add_request_timer; | |
1544 | ProfileTimer<std::chrono::nanoseconds> request_complete_timer; | |
1545 | protected: | |
1546 | #endif | |
1547 | ||
1548 | // NB: threads declared last, so constructed last and destructed first | |
1549 | ||
1550 | std::thread sched_ahead_thd; | |
1551 | ||
1552 | public: | |
1553 | ||
1554 | // push full constructor | |
1555 | template<typename Rep, typename Per> | |
1556 | PushPriorityQueue(typename super::ClientInfoFunc _client_info_f, | |
1557 | CanHandleRequestFunc _can_handle_f, | |
1558 | HandleRequestFunc _handle_f, | |
1559 | std::chrono::duration<Rep,Per> _idle_age, | |
1560 | std::chrono::duration<Rep,Per> _erase_age, | |
1561 | std::chrono::duration<Rep,Per> _check_time, | |
11fdf7f2 TL |
1562 | AtLimitParam at_limit_param = AtLimit::Wait, |
1563 | double anticipation_timeout = 0.0) : | |
7c673cae FG |
1564 | super(_client_info_f, |
1565 | _idle_age, _erase_age, _check_time, | |
11fdf7f2 | 1566 | at_limit_param, anticipation_timeout) |
7c673cae FG |
1567 | { |
1568 | can_handle_f = _can_handle_f; | |
1569 | handle_f = _handle_f; | |
1570 | sched_ahead_thd = std::thread(&PushPriorityQueue::run_sched_ahead, this); | |
1571 | } | |
1572 | ||
1573 | ||
1574 | // push convenience constructor | |
1575 | PushPriorityQueue(typename super::ClientInfoFunc _client_info_f, | |
1576 | CanHandleRequestFunc _can_handle_f, | |
1577 | HandleRequestFunc _handle_f, | |
11fdf7f2 TL |
1578 | AtLimitParam at_limit_param = AtLimit::Wait, |
1579 | double _anticipation_timeout = 0.0) : | |
7c673cae FG |
1580 | PushPriorityQueue(_client_info_f, |
1581 | _can_handle_f, | |
1582 | _handle_f, | |
11fdf7f2 TL |
1583 | standard_idle_age, |
1584 | standard_erase_age, | |
1585 | standard_check_time, | |
1586 | at_limit_param, | |
1587 | _anticipation_timeout) | |
7c673cae FG |
1588 | { |
1589 | // empty | |
1590 | } | |
1591 | ||
1592 | ||
1593 | ~PushPriorityQueue() { | |
1594 | this->finishing = true; | |
f67539c2 TL |
1595 | { |
1596 | std::lock_guard<std::mutex> l(sched_ahead_mtx); | |
1597 | sched_ahead_cv.notify_one(); | |
1598 | } | |
7c673cae FG |
1599 | sched_ahead_thd.join(); |
1600 | } | |
1601 | ||
1602 | public: | |
1603 | ||
11fdf7f2 TL |
1604 | int add_request(R&& request, |
1605 | const C& client_id, | |
1606 | const ReqParams& req_params, | |
1607 | const Cost cost = 1u) { | |
1608 | return add_request(typename super::RequestRef(new R(std::move(request))), | |
1609 | client_id, | |
1610 | req_params, | |
1611 | get_time(), | |
1612 | cost); | |
7c673cae FG |
1613 | } |
1614 | ||
1615 | ||
11fdf7f2 TL |
1616 | int add_request(typename super::RequestRef&& request, |
1617 | const C& client_id, | |
1618 | const ReqParams& req_params, | |
1619 | const Cost cost = 1u) { | |
20effc67 | 1620 | return add_request(std::move(request), client_id, req_params, get_time(), cost); |
7c673cae FG |
1621 | } |
1622 | ||
1623 | ||
11fdf7f2 TL |
1624 | int add_request_time(const R& request, |
1625 | const C& client_id, | |
1626 | const ReqParams& req_params, | |
1627 | const Time time, | |
1628 | const Cost cost = 1u) { | |
1629 | return add_request(typename super::RequestRef(new R(request)), | |
1630 | client_id, | |
1631 | req_params, | |
1632 | time, | |
1633 | cost); | |
7c673cae FG |
1634 | } |
1635 | ||
1636 | ||
11fdf7f2 TL |
1637 | int add_request(typename super::RequestRef&& request, |
1638 | const C& client_id, | |
1639 | const ReqParams& req_params, | |
1640 | const Time time, | |
1641 | const Cost cost = 1u) { | |
7c673cae FG |
1642 | typename super::DataGuard g(this->data_mtx); |
1643 | #ifdef PROFILE | |
1644 | add_request_timer.start(); | |
1645 | #endif | |
11fdf7f2 TL |
1646 | int r = super::do_add_request(std::move(request), |
1647 | client_id, | |
1648 | req_params, | |
1649 | time, | |
1650 | cost); | |
1651 | if (r == 0) { | |
20effc67 | 1652 | (void) schedule_request(); |
11fdf7f2 | 1653 | } |
7c673cae FG |
1654 | #ifdef PROFILE |
1655 | add_request_timer.stop(); | |
1656 | #endif | |
11fdf7f2 | 1657 | return r; |
7c673cae FG |
1658 | } |
1659 | ||
1660 | ||
1661 | void request_completed() { | |
1662 | typename super::DataGuard g(this->data_mtx); | |
1663 | #ifdef PROFILE | |
1664 | request_complete_timer.start(); | |
1665 | #endif | |
20effc67 | 1666 | (void) schedule_request(); |
7c673cae FG |
1667 | #ifdef PROFILE |
1668 | request_complete_timer.stop(); | |
1669 | #endif | |
1670 | } | |
1671 | ||
1672 | protected: | |
1673 | ||
1674 | // data_mtx should be held when called; furthermore, the heap | |
1675 | // should not be empty and the top element of the heap should | |
1676 | // not be already handled | |
1677 | // | |
1678 | // NOTE: the use of "super::ClientRec" in either the template | |
1679 | // construct or as a parameter to submit_top_request generated | |
1680 | // a compiler error in g++ 4.8.4, when ClientRec was | |
1681 | // "protected" rather than "public". By g++ 6.3.1 this was not | |
1682 | // an issue. But for backwards compatibility | |
1683 | // PriorityQueueBase::ClientRec is public. | |
1684 | template<typename C1, | |
1685 | IndIntruHeapData super::ClientRec::*C2, | |
1686 | typename C3, | |
9f95a23c | 1687 | unsigned B4> |
11fdf7f2 TL |
1688 | typename super::RequestMeta |
1689 | submit_top_request(IndIntruHeap<C1,typename super::ClientRec,C2,C3,B4>& heap, | |
1690 | PhaseType phase) { | |
7c673cae | 1691 | C client_result; |
11fdf7f2 | 1692 | RequestTag tag = super::pop_process_request(heap, |
7c673cae FG |
1693 | [this, phase, &client_result] |
1694 | (const C& client, | |
11fdf7f2 | 1695 | const Cost request_cost, |
7c673cae FG |
1696 | typename super::RequestRef& request) { |
1697 | client_result = client; | |
11fdf7f2 | 1698 | handle_f(client, std::move(request), phase, request_cost); |
7c673cae | 1699 | }); |
11fdf7f2 TL |
1700 | typename super::RequestMeta req(client_result, tag); |
1701 | return req; | |
7c673cae FG |
1702 | } |
1703 | ||
1704 | ||
1705 | // data_mtx should be held when called | |
1706 | void submit_request(typename super::HeapId heap_id) { | |
7c673cae FG |
1707 | switch(heap_id) { |
1708 | case super::HeapId::reservation: | |
1709 | // don't need to note client | |
1710 | (void) submit_top_request(this->resv_heap, PhaseType::reservation); | |
1711 | // unlike the other two cases, we do not reduce reservation | |
1712 | // tags here | |
1713 | ++this->reserv_sched_count; | |
1714 | break; | |
1715 | case super::HeapId::ready: | |
11fdf7f2 TL |
1716 | { |
1717 | auto req = submit_top_request(this->ready_heap, PhaseType::priority); | |
1718 | super::reduce_reservation_tags(req.client_id, req.tag); | |
1719 | } | |
7c673cae FG |
1720 | ++this->prop_sched_count; |
1721 | break; | |
1722 | default: | |
1723 | assert(false); | |
1724 | } | |
1725 | } // submit_request | |
1726 | ||
1727 | ||
1728 | // data_mtx should be held when called; unfortunately this | |
1729 | // function has to be repeated in both push & pull | |
1730 | // specializations | |
1731 | typename super::NextReq next_request() { | |
1732 | return next_request(get_time()); | |
1733 | } | |
1734 | ||
1735 | ||
1736 | // data_mtx should be held when called; overrides member | |
1737 | // function in base class to add check for whether a request can | |
1738 | // be pushed to the server | |
1739 | typename super::NextReq next_request(Time now) { | |
1740 | if (!can_handle_f()) { | |
1741 | typename super::NextReq result; | |
1742 | result.type = super::NextReqType::none; | |
1743 | return result; | |
1744 | } else { | |
1745 | return super::do_next_request(now); | |
1746 | } | |
1747 | } // next_request | |
1748 | ||
1749 | ||
1750 | // data_mtx should be held when called | |
20effc67 | 1751 | typename super::NextReqType schedule_request() { |
7c673cae FG |
1752 | typename super::NextReq next_req = next_request(); |
1753 | switch (next_req.type) { | |
1754 | case super::NextReqType::none: | |
20effc67 | 1755 | break; |
7c673cae FG |
1756 | case super::NextReqType::future: |
1757 | sched_at(next_req.when_ready); | |
1758 | break; | |
1759 | case super::NextReqType::returning: | |
1760 | submit_request(next_req.heap_id); | |
1761 | break; | |
1762 | default: | |
1763 | assert(false); | |
1764 | } | |
20effc67 | 1765 | return next_req.type; |
7c673cae FG |
1766 | } |
1767 | ||
1768 | ||
1769 | // this is the thread that handles running schedule_request at | |
1770 | // future times when nothing can be scheduled immediately | |
1771 | void run_sched_ahead() { | |
1772 | std::unique_lock<std::mutex> l(sched_ahead_mtx); | |
1773 | ||
1774 | while (!this->finishing) { | |
f67539c2 | 1775 | // predicate for cond.wait() |
20effc67 TL |
1776 | const auto pred = [this] () -> bool { |
1777 | return this->finishing || sched_ahead_when > TimeZero; | |
1778 | }; | |
f67539c2 | 1779 | |
7c673cae | 1780 | if (TimeZero == sched_ahead_when) { |
f67539c2 | 1781 | sched_ahead_cv.wait(l, pred); |
7c673cae | 1782 | } else { |
f67539c2 TL |
1783 | // cast from Time -> duration<Time> -> Duration -> TimePoint |
1784 | const auto until = typename super::TimePoint{ | |
1785 | duration_cast<typename super::Duration>( | |
1786 | std::chrono::duration<Time>{sched_ahead_when})}; | |
1787 | sched_ahead_cv.wait_until(l, until, pred); | |
7c673cae FG |
1788 | sched_ahead_when = TimeZero; |
1789 | if (this->finishing) return; | |
1790 | ||
1791 | l.unlock(); | |
1792 | if (!this->finishing) { | |
20effc67 TL |
1793 | do { |
1794 | typename super::DataGuard g(this->data_mtx); | |
1795 | if (schedule_request() == super::NextReqType::future) | |
1796 | break; | |
1797 | } while (!this->empty()); | |
1798 | } | |
7c673cae FG |
1799 | l.lock(); |
1800 | } | |
1801 | } | |
1802 | } | |
1803 | ||
1804 | ||
1805 | void sched_at(Time when) { | |
1806 | std::lock_guard<std::mutex> l(sched_ahead_mtx); | |
31f18b77 | 1807 | if (this->finishing) return; |
7c673cae FG |
1808 | if (TimeZero == sched_ahead_when || when < sched_ahead_when) { |
1809 | sched_ahead_when = when; | |
1810 | sched_ahead_cv.notify_one(); | |
1811 | } | |
1812 | } | |
1813 | }; // class PushPriorityQueue | |
20effc67 | 1814 | #endif // !WITH_SEASTAR |
7c673cae FG |
1815 | } // namespace dmclock |
1816 | } // namespace crimson |