]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | /* | |
5 | * Copyright (C) 2017 Red Hat Inc. | |
11fdf7f2 TL |
6 | * |
7 | * Author: J. Eric Ivancich <ivancich@redhat.com> | |
8 | * | |
9 | * This is free software; you can redistribute it and/or modify it | |
10 | * under the terms of the GNU Lesser General Public License version | |
11 | * 2.1, as published by the Free Software Foundation. See file | |
12 | * COPYING. | |
7c673cae FG |
13 | */ |
14 | ||
15 | ||
16 | #pragma once | |
17 | ||
18 | /* COMPILATION OPTIONS | |
7c673cae FG |
19 | * |
20 | * The prop_heap does not seem to be necessary. The only thing it | |
11fdf7f2 | 21 | * would help with is quickly finding the minimum proportion/prioity |
7c673cae FG |
22 | * when an idle client became active. To have the code maintain the |
23 | * proportional heap, define USE_PROP_HEAP (i.e., compiler argument | |
24 | * -DUSE_PROP_HEAP). | |
25 | */ | |
26 | ||
27 | #include <assert.h> | |
28 | ||
29 | #include <cmath> | |
30 | #include <memory> | |
31 | #include <map> | |
32 | #include <deque> | |
33 | #include <queue> | |
34 | #include <atomic> | |
35 | #include <mutex> | |
36 | #include <condition_variable> | |
37 | #include <thread> | |
38 | #include <iostream> | |
39 | #include <sstream> | |
40 | #include <limits> | |
41 | ||
42 | #include <boost/variant.hpp> | |
43 | ||
44 | #include "indirect_intrusive_heap.h" | |
45 | #include "run_every.h" | |
46 | #include "dmclock_util.h" | |
47 | #include "dmclock_recs.h" | |
48 | ||
49 | #ifdef PROFILE | |
50 | #include "profile.h" | |
51 | #endif | |
52 | ||
7c673cae FG |
53 | |
54 | namespace crimson { | |
55 | ||
56 | namespace dmclock { | |
57 | ||
58 | namespace c = crimson; | |
59 | ||
60 | constexpr double max_tag = std::numeric_limits<double>::is_iec559 ? | |
61 | std::numeric_limits<double>::infinity() : | |
62 | std::numeric_limits<double>::max(); | |
63 | constexpr double min_tag = std::numeric_limits<double>::is_iec559 ? | |
64 | -std::numeric_limits<double>::infinity() : | |
65 | std::numeric_limits<double>::lowest(); | |
66 | constexpr uint tag_modulo = 1000000; | |
67 | ||
11fdf7f2 TL |
68 | constexpr auto standard_idle_age = std::chrono::seconds(300); |
69 | constexpr auto standard_erase_age = std::chrono::seconds(600); | |
70 | constexpr auto standard_check_time = std::chrono::seconds(60); | |
71 | constexpr auto aggressive_check_time = std::chrono::seconds(5); | |
72 | constexpr uint standard_erase_max = 2000; | |
73 | ||
74 | enum class AtLimit { | |
75 | // requests are delayed until the limit is restored | |
76 | Wait, | |
77 | // requests are allowed to exceed their limit, if all other reservations | |
78 | // are met and below their limits | |
79 | Allow, | |
80 | // if an incoming request would exceed its limit, add_request() will | |
81 | // reject it with EAGAIN instead of adding it to the queue. cannot be used | |
82 | // with DelayedTagCalc, because add_request() needs an accurate tag | |
83 | Reject, | |
84 | }; | |
85 | ||
86 | // when AtLimit::Reject is used, only start rejecting requests once their | |
87 | // limit is above this threshold. requests under this threshold are | |
88 | // enqueued and processed like AtLimit::Wait | |
89 | using RejectThreshold = Time; | |
90 | ||
91 | // the AtLimit constructor parameter can either accept AtLimit or a value | |
92 | // for RejectThreshold (which implies AtLimit::Reject) | |
93 | using AtLimitParam = boost::variant<AtLimit, RejectThreshold>; | |
94 | ||
7c673cae | 95 | struct ClientInfo { |
11fdf7f2 TL |
96 | double reservation; // minimum |
97 | double weight; // proportional | |
98 | double limit; // maximum | |
7c673cae FG |
99 | |
100 | // multiplicative inverses of above, which we use in calculations | |
101 | // and don't want to recalculate repeatedly | |
11fdf7f2 TL |
102 | double reservation_inv; |
103 | double weight_inv; | |
104 | double limit_inv; | |
7c673cae FG |
105 | |
106 | // order parameters -- min, "normal", max | |
107 | ClientInfo(double _reservation, double _weight, double _limit) : | |
108 | reservation(_reservation), | |
109 | weight(_weight), | |
110 | limit(_limit), | |
111 | reservation_inv(0.0 == reservation ? 0.0 : 1.0 / reservation), | |
112 | weight_inv( 0.0 == weight ? 0.0 : 1.0 / weight), | |
113 | limit_inv( 0.0 == limit ? 0.0 : 1.0 / limit) | |
114 | { | |
115 | // empty | |
116 | } | |
117 | ||
118 | ||
119 | friend std::ostream& operator<<(std::ostream& out, | |
120 | const ClientInfo& client) { | |
121 | out << | |
122 | "{ ClientInfo:: r:" << client.reservation << | |
123 | " w:" << std::fixed << client.weight << | |
124 | " l:" << std::fixed << client.limit << | |
125 | " 1/r:" << std::fixed << client.reservation_inv << | |
126 | " 1/w:" << std::fixed << client.weight_inv << | |
127 | " 1/l:" << std::fixed << client.limit_inv << | |
128 | " }"; | |
129 | return out; | |
130 | } | |
131 | }; // class ClientInfo | |
132 | ||
133 | ||
134 | struct RequestTag { | |
11fdf7f2 TL |
135 | double reservation; |
136 | double proportion; | |
137 | double limit; | |
138 | uint32_t delta; | |
139 | uint32_t rho; | |
140 | Cost cost; | |
141 | bool ready; // true when within limit | |
142 | Time arrival; | |
7c673cae FG |
143 | |
144 | RequestTag(const RequestTag& prev_tag, | |
145 | const ClientInfo& client, | |
11fdf7f2 TL |
146 | const uint32_t _delta, |
147 | const uint32_t _rho, | |
31f18b77 | 148 | const Time time, |
11fdf7f2 TL |
149 | const Cost _cost = 1u, |
150 | const double anticipation_timeout = 0.0) : | |
151 | delta(_delta), | |
152 | rho(_rho), | |
153 | cost(_cost), | |
154 | ready(false), | |
155 | arrival(time) | |
7c673cae | 156 | { |
11fdf7f2 TL |
157 | assert(cost > 0); |
158 | Time max_time = time; | |
159 | if (time - anticipation_timeout < prev_tag.arrival) | |
160 | max_time -= anticipation_timeout; | |
161 | ||
162 | reservation = tag_calc(max_time, | |
163 | prev_tag.reservation, | |
164 | client.reservation_inv, | |
165 | rho, | |
166 | true, | |
167 | cost); | |
168 | proportion = tag_calc(max_time, | |
169 | prev_tag.proportion, | |
170 | client.weight_inv, | |
171 | delta, | |
172 | true, | |
173 | cost); | |
174 | limit = tag_calc(max_time, | |
175 | prev_tag.limit, | |
176 | client.limit_inv, | |
177 | delta, | |
178 | false, | |
179 | cost); | |
180 | ||
7c673cae FG |
181 | assert(reservation < max_tag || proportion < max_tag); |
182 | } | |
183 | ||
31f18b77 FG |
184 | RequestTag(const RequestTag& prev_tag, |
185 | const ClientInfo& client, | |
186 | const ReqParams req_params, | |
187 | const Time time, | |
11fdf7f2 TL |
188 | const Cost cost = 1u, |
189 | const double anticipation_timeout = 0.0) : | |
190 | RequestTag(prev_tag, client, req_params.delta, req_params.rho, time, | |
191 | cost, anticipation_timeout) | |
31f18b77 FG |
192 | { /* empty */ } |
193 | ||
11fdf7f2 TL |
194 | RequestTag(const double _res, const double _prop, const double _lim, |
195 | const Time _arrival, | |
196 | const uint32_t _delta = 0, | |
197 | const uint32_t _rho = 0, | |
198 | const Cost _cost = 1u) : | |
7c673cae FG |
199 | reservation(_res), |
200 | proportion(_prop), | |
201 | limit(_lim), | |
11fdf7f2 TL |
202 | delta(_delta), |
203 | rho(_rho), | |
204 | cost(_cost), | |
205 | ready(false), | |
206 | arrival(_arrival) | |
7c673cae | 207 | { |
11fdf7f2 | 208 | assert(cost > 0); |
7c673cae FG |
209 | assert(reservation < max_tag || proportion < max_tag); |
210 | } | |
211 | ||
212 | RequestTag(const RequestTag& other) : | |
213 | reservation(other.reservation), | |
214 | proportion(other.proportion), | |
215 | limit(other.limit), | |
11fdf7f2 TL |
216 | delta(other.delta), |
217 | rho(other.rho), | |
218 | cost(other.cost), | |
219 | ready(other.ready), | |
220 | arrival(other.arrival) | |
221 | { /* empty */ } | |
7c673cae FG |
222 | |
223 | static std::string format_tag_change(double before, double after) { | |
224 | if (before == after) { | |
225 | return std::string("same"); | |
226 | } else { | |
227 | std::stringstream ss; | |
228 | ss << format_tag(before) << "=>" << format_tag(after); | |
229 | return ss.str(); | |
230 | } | |
231 | } | |
232 | ||
233 | static std::string format_tag(double value) { | |
234 | if (max_tag == value) { | |
235 | return std::string("max"); | |
236 | } else if (min_tag == value) { | |
237 | return std::string("min"); | |
238 | } else { | |
239 | return format_time(value, tag_modulo); | |
240 | } | |
241 | } | |
242 | ||
243 | private: | |
244 | ||
31f18b77 | 245 | static double tag_calc(const Time time, |
11fdf7f2 TL |
246 | const double prev, |
247 | const double increment, | |
248 | const uint32_t dist_req_val, | |
249 | const bool extreme_is_high, | |
250 | const Cost cost) { | |
7c673cae FG |
251 | if (0.0 == increment) { |
252 | return extreme_is_high ? max_tag : min_tag; | |
253 | } else { | |
11fdf7f2 TL |
254 | // insure 64-bit arithmetic before conversion to double |
255 | double tag_increment = increment * (uint64_t(dist_req_val) + cost); | |
256 | return std::max(time, prev + tag_increment); | |
7c673cae FG |
257 | } |
258 | } | |
259 | ||
260 | friend std::ostream& operator<<(std::ostream& out, | |
261 | const RequestTag& tag) { | |
262 | out << | |
263 | "{ RequestTag:: ready:" << (tag.ready ? "true" : "false") << | |
264 | " r:" << format_tag(tag.reservation) << | |
265 | " p:" << format_tag(tag.proportion) << | |
266 | " l:" << format_tag(tag.limit) << | |
267 | #if 0 // try to resolve this to make sure Time is operator<<'able. | |
7c673cae | 268 | " arrival:" << tag.arrival << |
7c673cae FG |
269 | #endif |
270 | " }"; | |
271 | return out; | |
272 | } | |
273 | }; // class RequestTag | |
274 | ||
11fdf7f2 TL |
275 | // C is client identifier type, R is request type, |
276 | // IsDelayed controls whether tag calculation is delayed until the request | |
277 | // reaches the front of its queue. This is an optimization over the | |
278 | // originally published dmclock algorithm, allowing it to use the most | |
279 | // recent values of rho and delta. | |
280 | // U1 determines whether to use client information function dynamically, | |
281 | // B is heap branching factor | |
282 | template<typename C, typename R, bool IsDelayed, bool U1, uint B> | |
7c673cae | 283 | class PriorityQueueBase { |
d2e6a577 FG |
284 | // we don't want to include gtest.h just for FRIEND_TEST |
285 | friend class dmclock_server_client_idle_erase_Test; | |
7c673cae | 286 | |
11fdf7f2 TL |
287 | // types used for tag dispatch to select between implementations |
288 | using TagCalc = std::integral_constant<bool, IsDelayed>; | |
289 | using DelayedTagCalc = std::true_type; | |
290 | using ImmediateTagCalc = std::false_type; | |
291 | ||
7c673cae FG |
292 | public: |
293 | ||
294 | using RequestRef = std::unique_ptr<R>; | |
295 | ||
296 | protected: | |
297 | ||
298 | using TimePoint = decltype(std::chrono::steady_clock::now()); | |
299 | using Duration = std::chrono::milliseconds; | |
300 | using MarkPoint = std::pair<TimePoint,Counter>; | |
301 | ||
302 | enum class ReadyOption {ignore, lowers, raises}; | |
303 | ||
304 | // forward decl for friend decls | |
305 | template<double RequestTag::*, ReadyOption, bool> | |
306 | struct ClientCompare; | |
307 | ||
308 | class ClientReq { | |
309 | friend PriorityQueueBase; | |
310 | ||
311 | RequestTag tag; | |
312 | C client_id; | |
313 | RequestRef request; | |
314 | ||
315 | public: | |
316 | ||
317 | ClientReq(const RequestTag& _tag, | |
318 | const C& _client_id, | |
319 | RequestRef&& _request) : | |
320 | tag(_tag), | |
321 | client_id(_client_id), | |
322 | request(std::move(_request)) | |
323 | { | |
324 | // empty | |
325 | } | |
326 | ||
327 | friend std::ostream& operator<<(std::ostream& out, const ClientReq& c) { | |
328 | out << "{ ClientReq:: tag:" << c.tag << " client:" << | |
329 | c.client_id << " }"; | |
330 | return out; | |
331 | } | |
332 | }; // class ClientReq | |
333 | ||
11fdf7f2 TL |
334 | struct RequestMeta { |
335 | C client_id; | |
336 | RequestTag tag; | |
337 | ||
338 | RequestMeta(const C& _client_id, const RequestTag& _tag) : | |
339 | client_id(_client_id), | |
340 | tag(_tag) | |
341 | { | |
342 | // empty | |
343 | } | |
344 | }; | |
345 | ||
7c673cae FG |
346 | public: |
347 | ||
348 | // NOTE: ClientRec is in the "public" section for compatibility | |
349 | // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1 | |
350 | // ClientRec could be "protected" with no issue. [See comments | |
351 | // associated with function submit_top_request.] | |
352 | class ClientRec { | |
11fdf7f2 | 353 | friend PriorityQueueBase<C,R,IsDelayed,U1,B>; |
7c673cae FG |
354 | |
355 | C client; | |
356 | RequestTag prev_tag; | |
357 | std::deque<ClientReq> requests; | |
358 | ||
359 | // amount added from the proportion tag as a result of | |
360 | // an idle client becoming unidle | |
361 | double prop_delta = 0.0; | |
362 | ||
11fdf7f2 TL |
363 | c::IndIntruHeapData reserv_heap_data {}; |
364 | c::IndIntruHeapData lim_heap_data {}; | |
365 | c::IndIntruHeapData ready_heap_data {}; | |
7c673cae | 366 | #if USE_PROP_HEAP |
11fdf7f2 | 367 | c::IndIntruHeapData prop_heap_data {}; |
7c673cae FG |
368 | #endif |
369 | ||
370 | public: | |
371 | ||
11fdf7f2 | 372 | const ClientInfo* info; |
7c673cae FG |
373 | bool idle; |
374 | Counter last_tick; | |
375 | uint32_t cur_rho; | |
376 | uint32_t cur_delta; | |
377 | ||
378 | ClientRec(C _client, | |
11fdf7f2 | 379 | const ClientInfo* _info, |
7c673cae FG |
380 | Counter current_tick) : |
381 | client(_client), | |
382 | prev_tag(0.0, 0.0, 0.0, TimeZero), | |
383 | info(_info), | |
384 | idle(true), | |
385 | last_tick(current_tick), | |
386 | cur_rho(1), | |
387 | cur_delta(1) | |
388 | { | |
389 | // empty | |
390 | } | |
391 | ||
392 | inline const RequestTag& get_req_tag() const { | |
393 | return prev_tag; | |
394 | } | |
395 | ||
396 | static inline void assign_unpinned_tag(double& lhs, const double rhs) { | |
397 | if (rhs != max_tag && rhs != min_tag) { | |
398 | lhs = rhs; | |
399 | } | |
400 | } | |
401 | ||
402 | inline void update_req_tag(const RequestTag& _prev, | |
403 | const Counter& _tick) { | |
404 | assign_unpinned_tag(prev_tag.reservation, _prev.reservation); | |
405 | assign_unpinned_tag(prev_tag.limit, _prev.limit); | |
406 | assign_unpinned_tag(prev_tag.proportion, _prev.proportion); | |
11fdf7f2 | 407 | prev_tag.arrival = _prev.arrival; |
7c673cae FG |
408 | last_tick = _tick; |
409 | } | |
410 | ||
11fdf7f2 TL |
411 | inline void add_request(const RequestTag& tag, RequestRef&& request) { |
412 | requests.emplace_back(tag, client, std::move(request)); | |
7c673cae FG |
413 | } |
414 | ||
415 | inline const ClientReq& next_request() const { | |
416 | return requests.front(); | |
417 | } | |
418 | ||
419 | inline ClientReq& next_request() { | |
420 | return requests.front(); | |
421 | } | |
422 | ||
423 | inline void pop_request() { | |
424 | requests.pop_front(); | |
425 | } | |
426 | ||
427 | inline bool has_request() const { | |
428 | return !requests.empty(); | |
429 | } | |
430 | ||
431 | inline size_t request_count() const { | |
432 | return requests.size(); | |
433 | } | |
434 | ||
435 | // NB: because a deque is the underlying structure, this | |
436 | // operation might be expensive | |
11fdf7f2 | 437 | bool remove_by_req_filter_fw(std::function<bool(RequestRef&&)> filter_accum) { |
7c673cae FG |
438 | bool any_removed = false; |
439 | for (auto i = requests.begin(); | |
440 | i != requests.end(); | |
441 | /* no inc */) { | |
11fdf7f2 | 442 | if (filter_accum(std::move(i->request))) { |
7c673cae FG |
443 | any_removed = true; |
444 | i = requests.erase(i); | |
445 | } else { | |
446 | ++i; | |
447 | } | |
448 | } | |
449 | return any_removed; | |
450 | } | |
451 | ||
452 | // NB: because a deque is the underlying structure, this | |
453 | // operation might be expensive | |
11fdf7f2 | 454 | bool remove_by_req_filter_bw(std::function<bool(RequestRef&&)> filter_accum) { |
7c673cae FG |
455 | bool any_removed = false; |
456 | for (auto i = requests.rbegin(); | |
457 | i != requests.rend(); | |
458 | /* no inc */) { | |
11fdf7f2 | 459 | if (filter_accum(std::move(i->request))) { |
7c673cae FG |
460 | any_removed = true; |
461 | i = decltype(i){ requests.erase(std::next(i).base()) }; | |
462 | } else { | |
463 | ++i; | |
464 | } | |
465 | } | |
466 | return any_removed; | |
467 | } | |
468 | ||
469 | inline bool | |
11fdf7f2 | 470 | remove_by_req_filter(std::function<bool(RequestRef&&)> filter_accum, |
7c673cae FG |
471 | bool visit_backwards) { |
472 | if (visit_backwards) { | |
473 | return remove_by_req_filter_bw(filter_accum); | |
474 | } else { | |
475 | return remove_by_req_filter_fw(filter_accum); | |
476 | } | |
477 | } | |
478 | ||
479 | friend std::ostream& | |
480 | operator<<(std::ostream& out, | |
11fdf7f2 | 481 | const typename PriorityQueueBase::ClientRec& e) { |
7c673cae FG |
482 | out << "{ ClientRec::" << |
483 | " client:" << e.client << | |
484 | " prev_tag:" << e.prev_tag << | |
485 | " req_count:" << e.requests.size() << | |
486 | " top_req:"; | |
487 | if (e.has_request()) { | |
488 | out << e.next_request(); | |
489 | } else { | |
490 | out << "none"; | |
491 | } | |
492 | out << " }"; | |
493 | ||
494 | return out; | |
495 | } | |
496 | }; // class ClientRec | |
497 | ||
498 | using ClientRecRef = std::shared_ptr<ClientRec>; | |
499 | ||
500 | // when we try to get the next request, we'll be in one of three | |
501 | // situations -- we'll have one to return, have one that can | |
502 | // fire in the future, or not have any | |
503 | enum class NextReqType { returning, future, none }; | |
504 | ||
505 | // specifies which queue next request will get popped from | |
506 | enum class HeapId { reservation, ready }; | |
507 | ||
508 | // this is returned from next_req to tell the caller the situation | |
509 | struct NextReq { | |
510 | NextReqType type; | |
511 | union { | |
512 | HeapId heap_id; | |
513 | Time when_ready; | |
514 | }; | |
11fdf7f2 TL |
515 | |
516 | inline explicit NextReq() : | |
517 | type(NextReqType::none) | |
518 | { } | |
519 | ||
520 | inline NextReq(HeapId _heap_id) : | |
521 | type(NextReqType::returning), | |
522 | heap_id(_heap_id) | |
523 | { } | |
524 | ||
525 | inline NextReq(Time _when_ready) : | |
526 | type(NextReqType::future), | |
527 | when_ready(_when_ready) | |
528 | { } | |
529 | ||
530 | // calls to this are clearer than calls to the default | |
531 | // constructor | |
532 | static inline NextReq none() { | |
533 | return NextReq(); | |
534 | } | |
7c673cae FG |
535 | }; |
536 | ||
537 | ||
538 | // a function that can be called to look up client information | |
11fdf7f2 | 539 | using ClientInfoFunc = std::function<const ClientInfo*(const C&)>; |
7c673cae FG |
540 | |
541 | ||
542 | bool empty() const { | |
543 | DataGuard g(data_mtx); | |
544 | return (resv_heap.empty() || ! resv_heap.top().has_request()); | |
545 | } | |
546 | ||
547 | ||
548 | size_t client_count() const { | |
549 | DataGuard g(data_mtx); | |
550 | return resv_heap.size(); | |
551 | } | |
552 | ||
553 | ||
554 | size_t request_count() const { | |
555 | DataGuard g(data_mtx); | |
556 | size_t total = 0; | |
557 | for (auto i = resv_heap.cbegin(); i != resv_heap.cend(); ++i) { | |
558 | total += i->request_count(); | |
559 | } | |
560 | return total; | |
561 | } | |
562 | ||
563 | ||
11fdf7f2 | 564 | bool remove_by_req_filter(std::function<bool(RequestRef&&)> filter_accum, |
7c673cae FG |
565 | bool visit_backwards = false) { |
566 | bool any_removed = false; | |
567 | DataGuard g(data_mtx); | |
568 | for (auto i : client_map) { | |
569 | bool modified = | |
570 | i.second->remove_by_req_filter(filter_accum, visit_backwards); | |
571 | if (modified) { | |
572 | resv_heap.adjust(*i.second); | |
573 | limit_heap.adjust(*i.second); | |
574 | ready_heap.adjust(*i.second); | |
575 | #if USE_PROP_HEAP | |
576 | prop_heap.adjust(*i.second); | |
577 | #endif | |
578 | any_removed = true; | |
579 | } | |
580 | } | |
581 | return any_removed; | |
582 | } | |
583 | ||
584 | ||
585 | // use as a default value when no accumulator is provide | |
11fdf7f2 | 586 | static void request_sink(RequestRef&& req) { |
7c673cae FG |
587 | // do nothing |
588 | } | |
589 | ||
590 | ||
591 | void remove_by_client(const C& client, | |
592 | bool reverse = false, | |
11fdf7f2 | 593 | std::function<void (RequestRef&&)> accum = request_sink) { |
7c673cae FG |
594 | DataGuard g(data_mtx); |
595 | ||
596 | auto i = client_map.find(client); | |
597 | ||
598 | if (i == client_map.end()) return; | |
599 | ||
600 | if (reverse) { | |
601 | for (auto j = i->second->requests.rbegin(); | |
602 | j != i->second->requests.rend(); | |
603 | ++j) { | |
11fdf7f2 | 604 | accum(std::move(j->request)); |
7c673cae FG |
605 | } |
606 | } else { | |
607 | for (auto j = i->second->requests.begin(); | |
608 | j != i->second->requests.end(); | |
609 | ++j) { | |
11fdf7f2 | 610 | accum(std::move(j->request)); |
7c673cae FG |
611 | } |
612 | } | |
613 | ||
614 | i->second->requests.clear(); | |
615 | ||
616 | resv_heap.adjust(*i->second); | |
617 | limit_heap.adjust(*i->second); | |
618 | ready_heap.adjust(*i->second); | |
619 | #if USE_PROP_HEAP | |
620 | prop_heap.adjust(*i->second); | |
621 | #endif | |
622 | } | |
623 | ||
624 | ||
625 | uint get_heap_branching_factor() const { | |
626 | return B; | |
627 | } | |
628 | ||
629 | ||
11fdf7f2 TL |
630 | void update_client_info(const C& client_id) { |
631 | DataGuard g(data_mtx); | |
632 | auto client_it = client_map.find(client_id); | |
633 | if (client_map.end() != client_it) { | |
634 | ClientRec& client = (*client_it->second); | |
635 | client.info = client_info_f(client_id); | |
636 | } | |
637 | } | |
638 | ||
639 | ||
640 | void update_client_infos() { | |
641 | DataGuard g(data_mtx); | |
642 | for (auto i : client_map) { | |
643 | i.second->info = client_info_f(i.second->client); | |
644 | } | |
645 | } | |
646 | ||
647 | ||
7c673cae FG |
648 | friend std::ostream& operator<<(std::ostream& out, |
649 | const PriorityQueueBase& q) { | |
650 | std::lock_guard<decltype(q.data_mtx)> guard(q.data_mtx); | |
651 | ||
652 | out << "{ PriorityQueue::"; | |
653 | for (const auto& c : q.client_map) { | |
654 | out << " { client:" << c.first << ", record:" << *c.second << | |
655 | " }"; | |
656 | } | |
657 | if (!q.resv_heap.empty()) { | |
658 | const auto& resv = q.resv_heap.top(); | |
659 | out << " { reservation_top:" << resv << " }"; | |
660 | const auto& ready = q.ready_heap.top(); | |
661 | out << " { ready_top:" << ready << " }"; | |
662 | const auto& limit = q.limit_heap.top(); | |
663 | out << " { limit_top:" << limit << " }"; | |
664 | } else { | |
665 | out << " HEAPS-EMPTY"; | |
666 | } | |
667 | out << " }"; | |
668 | ||
669 | return out; | |
670 | } | |
671 | ||
672 | // for debugging | |
673 | void display_queues(std::ostream& out, | |
674 | bool show_res = true, | |
675 | bool show_lim = true, | |
676 | bool show_ready = true, | |
677 | bool show_prop = true) const { | |
678 | auto filter = [](const ClientRec& e)->bool { return true; }; | |
679 | DataGuard g(data_mtx); | |
680 | if (show_res) { | |
681 | resv_heap.display_sorted(out << "RESER:", filter); | |
682 | } | |
683 | if (show_lim) { | |
684 | limit_heap.display_sorted(out << "LIMIT:", filter); | |
685 | } | |
686 | if (show_ready) { | |
687 | ready_heap.display_sorted(out << "READY:", filter); | |
688 | } | |
689 | #if USE_PROP_HEAP | |
690 | if (show_prop) { | |
691 | prop_heap.display_sorted(out << "PROPO:", filter); | |
692 | } | |
693 | #endif | |
694 | } // display_queues | |
695 | ||
696 | ||
697 | protected: | |
698 | ||
699 | // The ClientCompare functor is essentially doing a precedes? | |
700 | // operator, returning true if and only if the first parameter | |
701 | // must precede the second parameter. If the second must precede | |
702 | // the first, or if they are equivalent, false should be | |
703 | // returned. The reason for this behavior is that it will be | |
704 | // called to test if two items are out of order and if true is | |
705 | // returned it will reverse the items. Therefore false is the | |
706 | // default return when it doesn't matter to prevent unnecessary | |
707 | // re-ordering. | |
708 | // | |
709 | // The template is supporting variations in sorting based on the | |
710 | // heap in question and allowing these variations to be handled | |
711 | // at compile-time. | |
712 | // | |
713 | // tag_field determines which tag is being used for comparison | |
714 | // | |
715 | // ready_opt determines how the ready flag influences the sort | |
716 | // | |
717 | // use_prop_delta determines whether the proportional delta is | |
718 | // added in for comparison | |
719 | template<double RequestTag::*tag_field, | |
720 | ReadyOption ready_opt, | |
721 | bool use_prop_delta> | |
722 | struct ClientCompare { | |
723 | bool operator()(const ClientRec& n1, const ClientRec& n2) const { | |
724 | if (n1.has_request()) { | |
725 | if (n2.has_request()) { | |
726 | const auto& t1 = n1.next_request().tag; | |
727 | const auto& t2 = n2.next_request().tag; | |
728 | if (ReadyOption::ignore == ready_opt || t1.ready == t2.ready) { | |
729 | // if we don't care about ready or the ready values are the same | |
730 | if (use_prop_delta) { | |
731 | return (t1.*tag_field + n1.prop_delta) < | |
732 | (t2.*tag_field + n2.prop_delta); | |
733 | } else { | |
734 | return t1.*tag_field < t2.*tag_field; | |
735 | } | |
736 | } else if (ReadyOption::raises == ready_opt) { | |
737 | // use_ready == true && the ready fields are different | |
738 | return t1.ready; | |
739 | } else { | |
740 | return t2.ready; | |
741 | } | |
742 | } else { | |
743 | // n1 has request but n2 does not | |
744 | return true; | |
745 | } | |
746 | } else if (n2.has_request()) { | |
747 | // n2 has request but n1 does not | |
748 | return false; | |
749 | } else { | |
750 | // both have none; keep stable w false | |
751 | return false; | |
752 | } | |
753 | } | |
754 | }; | |
755 | ||
11fdf7f2 TL |
756 | ClientInfoFunc client_info_f; |
757 | static constexpr bool is_dynamic_cli_info_f = U1; | |
7c673cae FG |
758 | |
759 | mutable std::mutex data_mtx; | |
760 | using DataGuard = std::lock_guard<decltype(data_mtx)>; | |
761 | ||
762 | // stable mapping between client ids and client queues | |
763 | std::map<C,ClientRecRef> client_map; | |
764 | ||
765 | c::IndIntruHeap<ClientRecRef, | |
766 | ClientRec, | |
767 | &ClientRec::reserv_heap_data, | |
768 | ClientCompare<&RequestTag::reservation, | |
769 | ReadyOption::ignore, | |
770 | false>, | |
771 | B> resv_heap; | |
772 | #if USE_PROP_HEAP | |
773 | c::IndIntruHeap<ClientRecRef, | |
774 | ClientRec, | |
775 | &ClientRec::prop_heap_data, | |
776 | ClientCompare<&RequestTag::proportion, | |
777 | ReadyOption::ignore, | |
778 | true>, | |
779 | B> prop_heap; | |
780 | #endif | |
781 | c::IndIntruHeap<ClientRecRef, | |
782 | ClientRec, | |
783 | &ClientRec::lim_heap_data, | |
784 | ClientCompare<&RequestTag::limit, | |
785 | ReadyOption::lowers, | |
786 | false>, | |
787 | B> limit_heap; | |
788 | c::IndIntruHeap<ClientRecRef, | |
789 | ClientRec, | |
790 | &ClientRec::ready_heap_data, | |
791 | ClientCompare<&RequestTag::proportion, | |
792 | ReadyOption::raises, | |
793 | true>, | |
794 | B> ready_heap; | |
795 | ||
11fdf7f2 TL |
796 | AtLimit at_limit; |
797 | RejectThreshold reject_threshold = 0; | |
798 | ||
799 | double anticipation_timeout; | |
7c673cae FG |
800 | |
801 | std::atomic_bool finishing; | |
802 | ||
803 | // every request creates a tick | |
804 | Counter tick = 0; | |
805 | ||
806 | // performance data collection | |
807 | size_t reserv_sched_count = 0; | |
808 | size_t prop_sched_count = 0; | |
809 | size_t limit_break_sched_count = 0; | |
810 | ||
811 | Duration idle_age; | |
812 | Duration erase_age; | |
813 | Duration check_time; | |
814 | std::deque<MarkPoint> clean_mark_points; | |
11fdf7f2 TL |
815 | // max number of clients to erase at a time |
816 | Counter erase_max; | |
817 | // unfinished last erase point | |
818 | Counter last_erase_point = 0; | |
7c673cae FG |
819 | |
820 | // NB: All threads declared at end, so they're destructed first! | |
821 | ||
822 | std::unique_ptr<RunEvery> cleaning_job; | |
823 | ||
11fdf7f2 TL |
824 | // helper function to return the value of a variant if it matches the |
825 | // given type T, or a default value of T otherwise | |
826 | template <typename T, typename Variant> | |
827 | static T get_or_default(const Variant& param, T default_value) { | |
828 | const T *p = boost::get<T>(¶m); | |
829 | return p ? *p : default_value; | |
830 | } | |
7c673cae FG |
831 | |
832 | // COMMON constructor that others feed into; we can accept three | |
833 | // different variations of durations | |
834 | template<typename Rep, typename Per> | |
835 | PriorityQueueBase(ClientInfoFunc _client_info_f, | |
836 | std::chrono::duration<Rep,Per> _idle_age, | |
837 | std::chrono::duration<Rep,Per> _erase_age, | |
838 | std::chrono::duration<Rep,Per> _check_time, | |
11fdf7f2 TL |
839 | AtLimitParam at_limit_param, |
840 | double _anticipation_timeout) : | |
7c673cae | 841 | client_info_f(_client_info_f), |
11fdf7f2 TL |
842 | at_limit(get_or_default(at_limit_param, AtLimit::Reject)), |
843 | reject_threshold(get_or_default(at_limit_param, RejectThreshold{0})), | |
844 | anticipation_timeout(_anticipation_timeout), | |
7c673cae FG |
845 | finishing(false), |
846 | idle_age(std::chrono::duration_cast<Duration>(_idle_age)), | |
847 | erase_age(std::chrono::duration_cast<Duration>(_erase_age)), | |
11fdf7f2 TL |
848 | check_time(std::chrono::duration_cast<Duration>(_check_time)), |
849 | erase_max(standard_erase_max) | |
7c673cae FG |
850 | { |
851 | assert(_erase_age >= _idle_age); | |
852 | assert(_check_time < _idle_age); | |
11fdf7f2 TL |
853 | // AtLimit::Reject depends on ImmediateTagCalc |
854 | assert(at_limit != AtLimit::Reject || !IsDelayed); | |
7c673cae FG |
855 | cleaning_job = |
856 | std::unique_ptr<RunEvery>( | |
857 | new RunEvery(check_time, | |
858 | std::bind(&PriorityQueueBase::do_clean, this))); | |
859 | } | |
860 | ||
861 | ||
862 | ~PriorityQueueBase() { | |
863 | finishing = true; | |
864 | } | |
865 | ||
866 | ||
11fdf7f2 TL |
867 | inline const ClientInfo* get_cli_info(ClientRec& client) const { |
868 | if (is_dynamic_cli_info_f) { | |
869 | client.info = client_info_f(client.client); | |
870 | } | |
871 | return client.info; | |
872 | } | |
873 | ||
7c673cae | 874 | // data_mtx must be held by caller |
11fdf7f2 TL |
875 | RequestTag initial_tag(DelayedTagCalc delayed, ClientRec& client, |
876 | const ReqParams& params, Time time, Cost cost) { | |
877 | RequestTag tag(0, 0, 0, time, 0, 0, cost); | |
7c673cae | 878 | |
11fdf7f2 TL |
879 | // only calculate a tag if the request is going straight to the front |
880 | if (!client.has_request()) { | |
881 | const ClientInfo* client_info = get_cli_info(client); | |
882 | assert(client_info); | |
883 | tag = RequestTag(client.get_req_tag(), *client_info, | |
884 | params, time, cost, anticipation_timeout); | |
7c673cae | 885 | |
11fdf7f2 TL |
886 | // copy tag to previous tag for client |
887 | client.update_req_tag(tag, tick); | |
888 | } | |
889 | return tag; | |
890 | } | |
891 | ||
892 | // data_mtx must be held by caller | |
893 | RequestTag initial_tag(ImmediateTagCalc imm, ClientRec& client, | |
894 | const ReqParams& params, Time time, Cost cost) { | |
895 | // calculate the tag unconditionally | |
896 | const ClientInfo* client_info = get_cli_info(client); | |
897 | assert(client_info); | |
898 | RequestTag tag(client.get_req_tag(), *client_info, | |
899 | params, time, cost, anticipation_timeout); | |
900 | ||
901 | // copy tag to previous tag for client | |
902 | client.update_req_tag(tag, tick); | |
903 | return tag; | |
904 | } | |
905 | ||
906 | // data_mtx must be held by caller. returns 0 on success. when using | |
907 | // AtLimit::Reject, requests that would exceed their limit are rejected | |
908 | // with EAGAIN, and the queue will not take ownership of the given | |
909 | // 'request' argument | |
910 | int do_add_request(RequestRef&& request, | |
911 | const C& client_id, | |
912 | const ReqParams& req_params, | |
913 | const Time time, | |
914 | const Cost cost = 1u) { | |
915 | ++tick; | |
916 | ||
917 | auto insert = client_map.emplace(client_id, ClientRecRef{}); | |
918 | if (insert.second) { | |
919 | // new client entry | |
920 | const ClientInfo* info = client_info_f(client_id); | |
921 | auto client_rec = std::make_shared<ClientRec>(client_id, info, tick); | |
7c673cae FG |
922 | resv_heap.push(client_rec); |
923 | #if USE_PROP_HEAP | |
924 | prop_heap.push(client_rec); | |
925 | #endif | |
926 | limit_heap.push(client_rec); | |
927 | ready_heap.push(client_rec); | |
11fdf7f2 | 928 | insert.first->second = std::move(client_rec); |
7c673cae FG |
929 | } |
930 | ||
931 | // for convenience, we'll create a reference to the shared pointer | |
11fdf7f2 | 932 | ClientRec& client = *insert.first->second; |
7c673cae FG |
933 | |
934 | if (client.idle) { | |
935 | // We need to do an adjustment so that idle clients compete | |
936 | // fairly on proportional tags since those tags may have | |
937 | // drifted from real-time. Either use the lowest existing | |
938 | // proportion tag -- O(1) -- or the client with the lowest | |
939 | // previous proportion tag -- O(n) where n = # clients. | |
940 | // | |
941 | // So we don't have to maintain a propotional queue that | |
942 | // keeps the minimum on proportional tag alone (we're | |
943 | // instead using a ready queue), we'll have to check each | |
944 | // client. | |
945 | // | |
946 | // The alternative would be to maintain a proportional queue | |
947 | // (define USE_PROP_TAG) and do an O(1) operation here. | |
948 | ||
949 | // Was unable to confirm whether equality testing on | |
950 | // std::numeric_limits<double>::max() is guaranteed, so | |
951 | // we'll use a compile-time calculated trigger that is one | |
952 | // third the max, which should be much larger than any | |
953 | // expected organic value. | |
954 | constexpr double lowest_prop_tag_trigger = | |
955 | std::numeric_limits<double>::max() / 3.0; | |
956 | ||
957 | double lowest_prop_tag = std::numeric_limits<double>::max(); | |
958 | for (auto const &c : client_map) { | |
959 | // don't use ourselves (or anything else that might be | |
960 | // listed as idle) since we're now in the map | |
961 | if (!c.second->idle) { | |
962 | double p; | |
963 | // use either lowest proportion tag or previous proportion tag | |
964 | if (c.second->has_request()) { | |
965 | p = c.second->next_request().tag.proportion + | |
966 | c.second->prop_delta; | |
967 | } else { | |
968 | p = c.second->get_req_tag().proportion + c.second->prop_delta; | |
969 | } | |
970 | ||
971 | if (p < lowest_prop_tag) { | |
972 | lowest_prop_tag = p; | |
973 | } | |
974 | } | |
975 | } | |
976 | ||
977 | // if this conditional does not fire, it | |
978 | if (lowest_prop_tag < lowest_prop_tag_trigger) { | |
979 | client.prop_delta = lowest_prop_tag - time; | |
980 | } | |
981 | client.idle = false; | |
982 | } // if this client was idle | |
983 | ||
11fdf7f2 | 984 | RequestTag tag = initial_tag(TagCalc{}, client, req_params, time, cost); |
7c673cae | 985 | |
11fdf7f2 TL |
986 | if (at_limit == AtLimit::Reject && |
987 | tag.limit > time + reject_threshold) { | |
988 | // if the client is over its limit, reject it here | |
989 | return EAGAIN; | |
7c673cae | 990 | } |
7c673cae | 991 | |
11fdf7f2 | 992 | client.add_request(tag, std::move(request)); |
7c673cae FG |
993 | if (1 == client.requests.size()) { |
994 | // NB: can the following 4 calls to adjust be changed | |
995 | // promote? Can adding a request ever demote a client in the | |
996 | // heaps? | |
997 | resv_heap.adjust(client); | |
998 | limit_heap.adjust(client); | |
999 | ready_heap.adjust(client); | |
1000 | #if USE_PROP_HEAP | |
1001 | prop_heap.adjust(client); | |
1002 | #endif | |
1003 | } | |
1004 | ||
1005 | client.cur_rho = req_params.rho; | |
1006 | client.cur_delta = req_params.delta; | |
1007 | ||
1008 | resv_heap.adjust(client); | |
1009 | limit_heap.adjust(client); | |
1010 | ready_heap.adjust(client); | |
1011 | #if USE_PROP_HEAP | |
1012 | prop_heap.adjust(client); | |
1013 | #endif | |
11fdf7f2 TL |
1014 | return 0; |
1015 | } // do_add_request | |
1016 | ||
1017 | // data_mtx must be held by caller | |
1018 | void update_next_tag(DelayedTagCalc delayed, ClientRec& top, | |
1019 | const RequestTag& tag) { | |
1020 | if (top.has_request()) { | |
1021 | // perform delayed tag calculation on the next request | |
1022 | ClientReq& next_first = top.next_request(); | |
1023 | const ClientInfo* client_info = get_cli_info(top); | |
1024 | assert(client_info); | |
1025 | next_first.tag = RequestTag(tag, *client_info, | |
1026 | top.cur_delta, top.cur_rho, | |
1027 | next_first.tag.arrival, | |
1028 | next_first.tag.cost, | |
1029 | anticipation_timeout); | |
1030 | // copy tag to previous tag for client | |
1031 | top.update_req_tag(next_first.tag, tick); | |
1032 | } | |
1033 | } | |
7c673cae | 1034 | |
11fdf7f2 TL |
1035 | void update_next_tag(ImmediateTagCalc imm, ClientRec& top, |
1036 | const RequestTag& tag) { | |
1037 | // the next tag was already calculated on insertion | |
1038 | } | |
7c673cae FG |
1039 | |
1040 | // data_mtx should be held when called; top of heap should have | |
1041 | // a ready request | |
1042 | template<typename C1, IndIntruHeapData ClientRec::*C2, typename C3> | |
11fdf7f2 | 1043 | RequestTag pop_process_request(IndIntruHeap<C1, ClientRec, C2, C3, B>& heap, |
7c673cae | 1044 | std::function<void(const C& client, |
11fdf7f2 | 1045 | const Cost cost, |
7c673cae FG |
1046 | RequestRef& request)> process) { |
1047 | // gain access to data | |
1048 | ClientRec& top = heap.top(); | |
31f18b77 | 1049 | |
11fdf7f2 | 1050 | Cost request_cost = top.next_request().tag.cost; |
31f18b77 FG |
1051 | RequestRef request = std::move(top.next_request().request); |
1052 | RequestTag tag = top.next_request().tag; | |
7c673cae FG |
1053 | |
1054 | // pop request and adjust heaps | |
1055 | top.pop_request(); | |
1056 | ||
11fdf7f2 | 1057 | update_next_tag(TagCalc{}, top, tag); |
7c673cae FG |
1058 | |
1059 | resv_heap.demote(top); | |
1060 | limit_heap.adjust(top); | |
1061 | #if USE_PROP_HEAP | |
1062 | prop_heap.demote(top); | |
1063 | #endif | |
1064 | ready_heap.demote(top); | |
1065 | ||
1066 | // process | |
11fdf7f2 TL |
1067 | process(top.client, request_cost, request); |
1068 | ||
1069 | return tag; | |
7c673cae FG |
1070 | } // pop_process_request |
1071 | ||
1072 | ||
11fdf7f2 TL |
1073 | // data_mtx must be held by caller |
1074 | void reduce_reservation_tags(DelayedTagCalc delayed, ClientRec& client, | |
1075 | const RequestTag& tag) { | |
1076 | if (!client.requests.empty()) { | |
1077 | // only maintain a tag for the first request | |
1078 | auto& r = client.requests.front(); | |
1079 | r.tag.reservation -= | |
1080 | client.info->reservation_inv * std::max(uint32_t(1), tag.rho); | |
1081 | } | |
1082 | } | |
1083 | ||
7c673cae | 1084 | // data_mtx should be held when called |
11fdf7f2 TL |
1085 | void reduce_reservation_tags(ImmediateTagCalc imm, ClientRec& client, |
1086 | const RequestTag& tag) { | |
1087 | double res_offset = | |
1088 | client.info->reservation_inv * std::max(uint32_t(1), tag.rho); | |
7c673cae | 1089 | for (auto& r : client.requests) { |
11fdf7f2 | 1090 | r.tag.reservation -= res_offset; |
7c673cae | 1091 | } |
7c673cae FG |
1092 | } |
1093 | ||
7c673cae | 1094 | // data_mtx should be held when called |
11fdf7f2 | 1095 | void reduce_reservation_tags(const C& client_id, const RequestTag& tag) { |
7c673cae FG |
1096 | auto client_it = client_map.find(client_id); |
1097 | ||
1098 | // means the client was cleaned from map; should never happen | |
1099 | // as long as cleaning times are long enough | |
1100 | assert(client_map.end() != client_it); | |
11fdf7f2 TL |
1101 | ClientRec& client = *client_it->second; |
1102 | reduce_reservation_tags(TagCalc{}, client, tag); | |
1103 | ||
1104 | // don't forget to update previous tag | |
1105 | client.prev_tag.reservation -= | |
1106 | client.info->reservation_inv * std::max(uint32_t(1), tag.rho); | |
1107 | resv_heap.promote(client); | |
7c673cae FG |
1108 | } |
1109 | ||
1110 | ||
1111 | // data_mtx should be held when called | |
1112 | NextReq do_next_request(Time now) { | |
11fdf7f2 TL |
1113 | // if reservation queue is empty, all are empty (i.e., no |
1114 | // active clients) | |
7c673cae | 1115 | if(resv_heap.empty()) { |
11fdf7f2 | 1116 | return NextReq::none(); |
7c673cae FG |
1117 | } |
1118 | ||
1119 | // try constraint (reservation) based scheduling | |
1120 | ||
1121 | auto& reserv = resv_heap.top(); | |
1122 | if (reserv.has_request() && | |
1123 | reserv.next_request().tag.reservation <= now) { | |
11fdf7f2 | 1124 | return NextReq(HeapId::reservation); |
7c673cae FG |
1125 | } |
1126 | ||
1127 | // no existing reservations before now, so try weight-based | |
1128 | // scheduling | |
1129 | ||
1130 | // all items that are within limit are eligible based on | |
1131 | // priority | |
1132 | auto limits = &limit_heap.top(); | |
1133 | while (limits->has_request() && | |
1134 | !limits->next_request().tag.ready && | |
1135 | limits->next_request().tag.limit <= now) { | |
1136 | limits->next_request().tag.ready = true; | |
1137 | ready_heap.promote(*limits); | |
1138 | limit_heap.demote(*limits); | |
1139 | ||
1140 | limits = &limit_heap.top(); | |
1141 | } | |
1142 | ||
1143 | auto& readys = ready_heap.top(); | |
1144 | if (readys.has_request() && | |
1145 | readys.next_request().tag.ready && | |
1146 | readys.next_request().tag.proportion < max_tag) { | |
11fdf7f2 | 1147 | return NextReq(HeapId::ready); |
7c673cae FG |
1148 | } |
1149 | ||
1150 | // if nothing is schedulable by reservation or | |
1151 | // proportion/weight, and if we allow limit break, try to | |
1152 | // schedule something with the lowest proportion tag or | |
1153 | // alternatively lowest reservation tag. | |
11fdf7f2 | 1154 | if (at_limit == AtLimit::Allow) { |
7c673cae FG |
1155 | if (readys.has_request() && |
1156 | readys.next_request().tag.proportion < max_tag) { | |
11fdf7f2 | 1157 | return NextReq(HeapId::ready); |
7c673cae FG |
1158 | } else if (reserv.has_request() && |
1159 | reserv.next_request().tag.reservation < max_tag) { | |
11fdf7f2 | 1160 | return NextReq(HeapId::reservation); |
7c673cae FG |
1161 | } |
1162 | } | |
1163 | ||
1164 | // nothing scheduled; make sure we re-run when next | |
1165 | // reservation item or next limited item comes up | |
1166 | ||
1167 | Time next_call = TimeMax; | |
1168 | if (resv_heap.top().has_request()) { | |
1169 | next_call = | |
1170 | min_not_0_time(next_call, | |
1171 | resv_heap.top().next_request().tag.reservation); | |
1172 | } | |
1173 | if (limit_heap.top().has_request()) { | |
1174 | const auto& next = limit_heap.top().next_request(); | |
1175 | assert(!next.tag.ready || max_tag == next.tag.proportion); | |
1176 | next_call = min_not_0_time(next_call, next.tag.limit); | |
1177 | } | |
1178 | if (next_call < TimeMax) { | |
11fdf7f2 | 1179 | return NextReq(next_call); |
7c673cae | 1180 | } else { |
11fdf7f2 | 1181 | return NextReq::none(); |
7c673cae FG |
1182 | } |
1183 | } // do_next_request | |
1184 | ||
1185 | ||
1186 | // if possible is not zero and less than current then return it; | |
1187 | // otherwise return current; the idea is we're trying to find | |
1188 | // the minimal time but ignoring zero | |
1189 | static inline const Time& min_not_0_time(const Time& current, | |
1190 | const Time& possible) { | |
1191 | return TimeZero == possible ? current : std::min(current, possible); | |
1192 | } | |
1193 | ||
1194 | ||
1195 | /* | |
1196 | * This is being called regularly by RunEvery. Every time it's | |
1197 | * called it notes the time and delta counter (mark point) in a | |
1198 | * deque. It also looks at the deque to find the most recent | |
1199 | * mark point that is older than clean_age. It then walks the | |
1200 | * map and delete all server entries that were last used before | |
1201 | * that mark point. | |
1202 | */ | |
1203 | void do_clean() { | |
1204 | TimePoint now = std::chrono::steady_clock::now(); | |
1205 | DataGuard g(data_mtx); | |
1206 | clean_mark_points.emplace_back(MarkPoint(now, tick)); | |
1207 | ||
1208 | // first erase the super-old client records | |
1209 | ||
11fdf7f2 | 1210 | Counter erase_point = last_erase_point; |
7c673cae FG |
1211 | auto point = clean_mark_points.front(); |
1212 | while (point.first <= now - erase_age) { | |
11fdf7f2 TL |
1213 | last_erase_point = point.second; |
1214 | erase_point = last_erase_point; | |
7c673cae FG |
1215 | clean_mark_points.pop_front(); |
1216 | point = clean_mark_points.front(); | |
1217 | } | |
1218 | ||
1219 | Counter idle_point = 0; | |
1220 | for (auto i : clean_mark_points) { | |
1221 | if (i.first <= now - idle_age) { | |
1222 | idle_point = i.second; | |
1223 | } else { | |
1224 | break; | |
1225 | } | |
1226 | } | |
1227 | ||
11fdf7f2 | 1228 | Counter erased_num = 0; |
7c673cae FG |
1229 | if (erase_point > 0 || idle_point > 0) { |
1230 | for (auto i = client_map.begin(); i != client_map.end(); /* empty */) { | |
1231 | auto i2 = i++; | |
11fdf7f2 TL |
1232 | if (erase_point && |
1233 | erased_num < erase_max && | |
1234 | i2->second->last_tick <= erase_point) { | |
7c673cae FG |
1235 | delete_from_heaps(i2->second); |
1236 | client_map.erase(i2); | |
11fdf7f2 | 1237 | erased_num++; |
7c673cae FG |
1238 | } else if (idle_point && i2->second->last_tick <= idle_point) { |
1239 | i2->second->idle = true; | |
1240 | } | |
1241 | } // for | |
11fdf7f2 TL |
1242 | |
1243 | auto wperiod = check_time; | |
1244 | if (erased_num >= erase_max) { | |
1245 | wperiod = duration_cast<milliseconds>(aggressive_check_time); | |
1246 | } else { | |
1247 | // clean finished, refresh | |
1248 | last_erase_point = 0; | |
1249 | } | |
1250 | cleaning_job->try_update(wperiod); | |
7c673cae FG |
1251 | } // if |
1252 | } // do_clean | |
1253 | ||
1254 | ||
1255 | // data_mtx must be held by caller | |
1256 | template<IndIntruHeapData ClientRec::*C1,typename C2> | |
1257 | void delete_from_heap(ClientRecRef& client, | |
1258 | c::IndIntruHeap<ClientRecRef,ClientRec,C1,C2,B>& heap) { | |
11fdf7f2 | 1259 | auto i = heap.at(client); |
7c673cae FG |
1260 | heap.remove(i); |
1261 | } | |
1262 | ||
1263 | ||
1264 | // data_mtx must be held by caller | |
1265 | void delete_from_heaps(ClientRecRef& client) { | |
1266 | delete_from_heap(client, resv_heap); | |
1267 | #if USE_PROP_HEAP | |
1268 | delete_from_heap(client, prop_heap); | |
1269 | #endif | |
1270 | delete_from_heap(client, limit_heap); | |
1271 | delete_from_heap(client, ready_heap); | |
1272 | } | |
1273 | }; // class PriorityQueueBase | |
1274 | ||
1275 | ||
11fdf7f2 TL |
1276 | template<typename C, typename R, bool IsDelayed=false, bool U1=false, uint B=2> |
1277 | class PullPriorityQueue : public PriorityQueueBase<C,R,IsDelayed,U1,B> { | |
1278 | using super = PriorityQueueBase<C,R,IsDelayed,U1,B>; | |
7c673cae FG |
1279 | |
1280 | public: | |
1281 | ||
1282 | // When a request is pulled, this is the return type. | |
1283 | struct PullReq { | |
1284 | struct Retn { | |
11fdf7f2 TL |
1285 | C client; |
1286 | typename super::RequestRef request; | |
1287 | PhaseType phase; | |
1288 | Cost cost; | |
7c673cae FG |
1289 | }; |
1290 | ||
1291 | typename super::NextReqType type; | |
1292 | boost::variant<Retn,Time> data; | |
1293 | ||
1294 | bool is_none() const { return type == super::NextReqType::none; } | |
1295 | ||
1296 | bool is_retn() const { return type == super::NextReqType::returning; } | |
1297 | Retn& get_retn() { | |
1298 | return boost::get<Retn>(data); | |
1299 | } | |
1300 | ||
1301 | bool is_future() const { return type == super::NextReqType::future; } | |
1302 | Time getTime() const { return boost::get<Time>(data); } | |
1303 | }; | |
1304 | ||
1305 | ||
1306 | #ifdef PROFILE | |
1307 | ProfileTimer<std::chrono::nanoseconds> pull_request_timer; | |
1308 | ProfileTimer<std::chrono::nanoseconds> add_request_timer; | |
1309 | #endif | |
1310 | ||
1311 | template<typename Rep, typename Per> | |
1312 | PullPriorityQueue(typename super::ClientInfoFunc _client_info_f, | |
1313 | std::chrono::duration<Rep,Per> _idle_age, | |
1314 | std::chrono::duration<Rep,Per> _erase_age, | |
1315 | std::chrono::duration<Rep,Per> _check_time, | |
11fdf7f2 TL |
1316 | AtLimitParam at_limit_param = AtLimit::Wait, |
1317 | double _anticipation_timeout = 0.0) : | |
7c673cae FG |
1318 | super(_client_info_f, |
1319 | _idle_age, _erase_age, _check_time, | |
11fdf7f2 | 1320 | at_limit_param, _anticipation_timeout) |
7c673cae FG |
1321 | { |
1322 | // empty | |
1323 | } | |
1324 | ||
1325 | ||
1326 | // pull convenience constructor | |
1327 | PullPriorityQueue(typename super::ClientInfoFunc _client_info_f, | |
11fdf7f2 TL |
1328 | AtLimitParam at_limit_param = AtLimit::Wait, |
1329 | double _anticipation_timeout = 0.0) : | |
7c673cae | 1330 | PullPriorityQueue(_client_info_f, |
11fdf7f2 TL |
1331 | standard_idle_age, |
1332 | standard_erase_age, | |
1333 | standard_check_time, | |
1334 | at_limit_param, | |
1335 | _anticipation_timeout) | |
7c673cae FG |
1336 | { |
1337 | // empty | |
1338 | } | |
1339 | ||
1340 | ||
11fdf7f2 TL |
1341 | int add_request(R&& request, |
1342 | const C& client_id, | |
1343 | const ReqParams& req_params, | |
1344 | const Cost cost = 1u) { | |
1345 | return add_request(typename super::RequestRef(new R(std::move(request))), | |
1346 | client_id, | |
1347 | req_params, | |
1348 | get_time(), | |
1349 | cost); | |
7c673cae FG |
1350 | } |
1351 | ||
1352 | ||
11fdf7f2 TL |
1353 | int add_request(R&& request, |
1354 | const C& client_id, | |
1355 | const Cost cost = 1u) { | |
7c673cae | 1356 | static const ReqParams null_req_params; |
11fdf7f2 TL |
1357 | return add_request(typename super::RequestRef(new R(std::move(request))), |
1358 | client_id, | |
1359 | null_req_params, | |
1360 | get_time(), | |
1361 | cost); | |
7c673cae FG |
1362 | } |
1363 | ||
1364 | ||
11fdf7f2 TL |
1365 | int add_request_time(R&& request, |
1366 | const C& client_id, | |
1367 | const ReqParams& req_params, | |
1368 | const Time time, | |
1369 | const Cost cost = 1u) { | |
1370 | return add_request(typename super::RequestRef(new R(std::move(request))), | |
1371 | client_id, | |
1372 | req_params, | |
1373 | time, | |
1374 | cost); | |
7c673cae FG |
1375 | } |
1376 | ||
1377 | ||
11fdf7f2 TL |
1378 | int add_request(typename super::RequestRef&& request, |
1379 | const C& client_id, | |
1380 | const ReqParams& req_params, | |
1381 | const Cost cost = 1u) { | |
1382 | return add_request(request, req_params, client_id, get_time(), cost); | |
7c673cae FG |
1383 | } |
1384 | ||
1385 | ||
11fdf7f2 TL |
1386 | int add_request(typename super::RequestRef&& request, |
1387 | const C& client_id, | |
1388 | const Cost cost = 1u) { | |
7c673cae | 1389 | static const ReqParams null_req_params; |
11fdf7f2 | 1390 | return add_request(request, null_req_params, client_id, get_time(), cost); |
7c673cae FG |
1391 | } |
1392 | ||
1393 | ||
1394 | // this does the work; the versions above provide alternate interfaces | |
11fdf7f2 TL |
1395 | int add_request(typename super::RequestRef&& request, |
1396 | const C& client_id, | |
1397 | const ReqParams& req_params, | |
1398 | const Time time, | |
1399 | const Cost cost = 1u) { | |
7c673cae FG |
1400 | typename super::DataGuard g(this->data_mtx); |
1401 | #ifdef PROFILE | |
1402 | add_request_timer.start(); | |
1403 | #endif | |
11fdf7f2 TL |
1404 | int r = super::do_add_request(std::move(request), |
1405 | client_id, | |
1406 | req_params, | |
1407 | time, | |
1408 | cost); | |
7c673cae FG |
1409 | // no call to schedule_request for pull version |
1410 | #ifdef PROFILE | |
1411 | add_request_timer.stop(); | |
1412 | #endif | |
11fdf7f2 | 1413 | return r; |
7c673cae FG |
1414 | } |
1415 | ||
1416 | ||
1417 | inline PullReq pull_request() { | |
1418 | return pull_request(get_time()); | |
1419 | } | |
1420 | ||
1421 | ||
11fdf7f2 | 1422 | PullReq pull_request(const Time now) { |
7c673cae FG |
1423 | PullReq result; |
1424 | typename super::DataGuard g(this->data_mtx); | |
1425 | #ifdef PROFILE | |
1426 | pull_request_timer.start(); | |
1427 | #endif | |
1428 | ||
1429 | typename super::NextReq next = super::do_next_request(now); | |
1430 | result.type = next.type; | |
1431 | switch(next.type) { | |
1432 | case super::NextReqType::none: | |
1433 | return result; | |
7c673cae FG |
1434 | case super::NextReqType::future: |
1435 | result.data = next.when_ready; | |
1436 | return result; | |
7c673cae FG |
1437 | case super::NextReqType::returning: |
1438 | // to avoid nesting, break out and let code below handle this case | |
1439 | break; | |
1440 | default: | |
1441 | assert(false); | |
1442 | } | |
1443 | ||
1444 | // we'll only get here if we're returning an entry | |
1445 | ||
1446 | auto process_f = | |
1447 | [&] (PullReq& pull_result, PhaseType phase) -> | |
1448 | std::function<void(const C&, | |
11fdf7f2 | 1449 | uint64_t, |
7c673cae FG |
1450 | typename super::RequestRef&)> { |
1451 | return [&pull_result, phase](const C& client, | |
11fdf7f2 | 1452 | const Cost request_cost, |
7c673cae | 1453 | typename super::RequestRef& request) { |
11fdf7f2 TL |
1454 | pull_result.data = typename PullReq::Retn{ client, |
1455 | std::move(request), | |
1456 | phase, | |
1457 | request_cost }; | |
7c673cae FG |
1458 | }; |
1459 | }; | |
1460 | ||
1461 | switch(next.heap_id) { | |
1462 | case super::HeapId::reservation: | |
11fdf7f2 TL |
1463 | (void) super::pop_process_request(this->resv_heap, |
1464 | process_f(result, | |
1465 | PhaseType::reservation)); | |
7c673cae FG |
1466 | ++this->reserv_sched_count; |
1467 | break; | |
1468 | case super::HeapId::ready: | |
11fdf7f2 TL |
1469 | { |
1470 | auto tag = super::pop_process_request(this->ready_heap, | |
7c673cae | 1471 | process_f(result, PhaseType::priority)); |
11fdf7f2 | 1472 | // need to use retn temporarily |
7c673cae | 1473 | auto& retn = boost::get<typename PullReq::Retn>(result.data); |
11fdf7f2 | 1474 | super::reduce_reservation_tags(retn.client, tag); |
7c673cae FG |
1475 | } |
1476 | ++this->prop_sched_count; | |
1477 | break; | |
1478 | default: | |
1479 | assert(false); | |
1480 | } | |
1481 | ||
1482 | #ifdef PROFILE | |
1483 | pull_request_timer.stop(); | |
1484 | #endif | |
1485 | return result; | |
1486 | } // pull_request | |
1487 | ||
1488 | ||
1489 | protected: | |
1490 | ||
1491 | ||
1492 | // data_mtx should be held when called; unfortunately this | |
1493 | // function has to be repeated in both push & pull | |
1494 | // specializations | |
1495 | typename super::NextReq next_request() { | |
1496 | return next_request(get_time()); | |
1497 | } | |
1498 | }; // class PullPriorityQueue | |
1499 | ||
1500 | ||
1501 | // PUSH version | |
11fdf7f2 TL |
1502 | template<typename C, typename R, bool IsDelayed=false, bool U1=false, uint B=2> |
1503 | class PushPriorityQueue : public PriorityQueueBase<C,R,IsDelayed,U1,B> { | |
7c673cae FG |
1504 | |
1505 | protected: | |
1506 | ||
11fdf7f2 | 1507 | using super = PriorityQueueBase<C,R,IsDelayed,U1,B>; |
7c673cae FG |
1508 | |
1509 | public: | |
1510 | ||
1511 | // a function to see whether the server can handle another request | |
1512 | using CanHandleRequestFunc = std::function<bool(void)>; | |
1513 | ||
1514 | // a function to submit a request to the server; the second | |
1515 | // parameter is a callback when it's completed | |
1516 | using HandleRequestFunc = | |
11fdf7f2 | 1517 | std::function<void(const C&,typename super::RequestRef,PhaseType,uint64_t)>; |
7c673cae FG |
1518 | |
1519 | protected: | |
1520 | ||
1521 | CanHandleRequestFunc can_handle_f; | |
1522 | HandleRequestFunc handle_f; | |
1523 | // for handling timed scheduling | |
1524 | std::mutex sched_ahead_mtx; | |
1525 | std::condition_variable sched_ahead_cv; | |
1526 | Time sched_ahead_when = TimeZero; | |
1527 | ||
1528 | #ifdef PROFILE | |
1529 | public: | |
1530 | ProfileTimer<std::chrono::nanoseconds> add_request_timer; | |
1531 | ProfileTimer<std::chrono::nanoseconds> request_complete_timer; | |
1532 | protected: | |
1533 | #endif | |
1534 | ||
1535 | // NB: threads declared last, so constructed last and destructed first | |
1536 | ||
1537 | std::thread sched_ahead_thd; | |
1538 | ||
1539 | public: | |
1540 | ||
1541 | // push full constructor | |
1542 | template<typename Rep, typename Per> | |
1543 | PushPriorityQueue(typename super::ClientInfoFunc _client_info_f, | |
1544 | CanHandleRequestFunc _can_handle_f, | |
1545 | HandleRequestFunc _handle_f, | |
1546 | std::chrono::duration<Rep,Per> _idle_age, | |
1547 | std::chrono::duration<Rep,Per> _erase_age, | |
1548 | std::chrono::duration<Rep,Per> _check_time, | |
11fdf7f2 TL |
1549 | AtLimitParam at_limit_param = AtLimit::Wait, |
1550 | double anticipation_timeout = 0.0) : | |
7c673cae FG |
1551 | super(_client_info_f, |
1552 | _idle_age, _erase_age, _check_time, | |
11fdf7f2 | 1553 | at_limit_param, anticipation_timeout) |
7c673cae FG |
1554 | { |
1555 | can_handle_f = _can_handle_f; | |
1556 | handle_f = _handle_f; | |
1557 | sched_ahead_thd = std::thread(&PushPriorityQueue::run_sched_ahead, this); | |
1558 | } | |
1559 | ||
1560 | ||
1561 | // push convenience constructor | |
1562 | PushPriorityQueue(typename super::ClientInfoFunc _client_info_f, | |
1563 | CanHandleRequestFunc _can_handle_f, | |
1564 | HandleRequestFunc _handle_f, | |
11fdf7f2 TL |
1565 | AtLimitParam at_limit_param = AtLimit::Wait, |
1566 | double _anticipation_timeout = 0.0) : | |
7c673cae FG |
1567 | PushPriorityQueue(_client_info_f, |
1568 | _can_handle_f, | |
1569 | _handle_f, | |
11fdf7f2 TL |
1570 | standard_idle_age, |
1571 | standard_erase_age, | |
1572 | standard_check_time, | |
1573 | at_limit_param, | |
1574 | _anticipation_timeout) | |
7c673cae FG |
1575 | { |
1576 | // empty | |
1577 | } | |
1578 | ||
1579 | ||
1580 | ~PushPriorityQueue() { | |
1581 | this->finishing = true; | |
1582 | sched_ahead_cv.notify_one(); | |
1583 | sched_ahead_thd.join(); | |
1584 | } | |
1585 | ||
1586 | public: | |
1587 | ||
11fdf7f2 TL |
1588 | int add_request(R&& request, |
1589 | const C& client_id, | |
1590 | const ReqParams& req_params, | |
1591 | const Cost cost = 1u) { | |
1592 | return add_request(typename super::RequestRef(new R(std::move(request))), | |
1593 | client_id, | |
1594 | req_params, | |
1595 | get_time(), | |
1596 | cost); | |
7c673cae FG |
1597 | } |
1598 | ||
1599 | ||
11fdf7f2 TL |
1600 | int add_request(typename super::RequestRef&& request, |
1601 | const C& client_id, | |
1602 | const ReqParams& req_params, | |
1603 | const Cost cost = 1u) { | |
1604 | return add_request(request, req_params, client_id, get_time(), cost); | |
7c673cae FG |
1605 | } |
1606 | ||
1607 | ||
11fdf7f2 TL |
1608 | int add_request_time(const R& request, |
1609 | const C& client_id, | |
1610 | const ReqParams& req_params, | |
1611 | const Time time, | |
1612 | const Cost cost = 1u) { | |
1613 | return add_request(typename super::RequestRef(new R(request)), | |
1614 | client_id, | |
1615 | req_params, | |
1616 | time, | |
1617 | cost); | |
7c673cae FG |
1618 | } |
1619 | ||
1620 | ||
11fdf7f2 TL |
1621 | int add_request(typename super::RequestRef&& request, |
1622 | const C& client_id, | |
1623 | const ReqParams& req_params, | |
1624 | const Time time, | |
1625 | const Cost cost = 1u) { | |
7c673cae FG |
1626 | typename super::DataGuard g(this->data_mtx); |
1627 | #ifdef PROFILE | |
1628 | add_request_timer.start(); | |
1629 | #endif | |
11fdf7f2 TL |
1630 | int r = super::do_add_request(std::move(request), |
1631 | client_id, | |
1632 | req_params, | |
1633 | time, | |
1634 | cost); | |
1635 | if (r == 0) { | |
1636 | schedule_request(); | |
1637 | } | |
7c673cae FG |
1638 | #ifdef PROFILE |
1639 | add_request_timer.stop(); | |
1640 | #endif | |
11fdf7f2 | 1641 | return r; |
7c673cae FG |
1642 | } |
1643 | ||
1644 | ||
1645 | void request_completed() { | |
1646 | typename super::DataGuard g(this->data_mtx); | |
1647 | #ifdef PROFILE | |
1648 | request_complete_timer.start(); | |
1649 | #endif | |
1650 | schedule_request(); | |
1651 | #ifdef PROFILE | |
1652 | request_complete_timer.stop(); | |
1653 | #endif | |
1654 | } | |
1655 | ||
1656 | protected: | |
1657 | ||
1658 | // data_mtx should be held when called; furthermore, the heap | |
1659 | // should not be empty and the top element of the heap should | |
1660 | // not be already handled | |
1661 | // | |
1662 | // NOTE: the use of "super::ClientRec" in either the template | |
1663 | // construct or as a parameter to submit_top_request generated | |
1664 | // a compiler error in g++ 4.8.4, when ClientRec was | |
1665 | // "protected" rather than "public". By g++ 6.3.1 this was not | |
1666 | // an issue. But for backwards compatibility | |
1667 | // PriorityQueueBase::ClientRec is public. | |
1668 | template<typename C1, | |
1669 | IndIntruHeapData super::ClientRec::*C2, | |
1670 | typename C3, | |
1671 | uint B4> | |
11fdf7f2 TL |
1672 | typename super::RequestMeta |
1673 | submit_top_request(IndIntruHeap<C1,typename super::ClientRec,C2,C3,B4>& heap, | |
1674 | PhaseType phase) { | |
7c673cae | 1675 | C client_result; |
11fdf7f2 | 1676 | RequestTag tag = super::pop_process_request(heap, |
7c673cae FG |
1677 | [this, phase, &client_result] |
1678 | (const C& client, | |
11fdf7f2 | 1679 | const Cost request_cost, |
7c673cae FG |
1680 | typename super::RequestRef& request) { |
1681 | client_result = client; | |
11fdf7f2 | 1682 | handle_f(client, std::move(request), phase, request_cost); |
7c673cae | 1683 | }); |
11fdf7f2 TL |
1684 | typename super::RequestMeta req(client_result, tag); |
1685 | return req; | |
7c673cae FG |
1686 | } |
1687 | ||
1688 | ||
1689 | // data_mtx should be held when called | |
1690 | void submit_request(typename super::HeapId heap_id) { | |
7c673cae FG |
1691 | switch(heap_id) { |
1692 | case super::HeapId::reservation: | |
1693 | // don't need to note client | |
1694 | (void) submit_top_request(this->resv_heap, PhaseType::reservation); | |
1695 | // unlike the other two cases, we do not reduce reservation | |
1696 | // tags here | |
1697 | ++this->reserv_sched_count; | |
1698 | break; | |
1699 | case super::HeapId::ready: | |
11fdf7f2 TL |
1700 | { |
1701 | auto req = submit_top_request(this->ready_heap, PhaseType::priority); | |
1702 | super::reduce_reservation_tags(req.client_id, req.tag); | |
1703 | } | |
7c673cae FG |
1704 | ++this->prop_sched_count; |
1705 | break; | |
1706 | default: | |
1707 | assert(false); | |
1708 | } | |
1709 | } // submit_request | |
1710 | ||
1711 | ||
1712 | // data_mtx should be held when called; unfortunately this | |
1713 | // function has to be repeated in both push & pull | |
1714 | // specializations | |
1715 | typename super::NextReq next_request() { | |
1716 | return next_request(get_time()); | |
1717 | } | |
1718 | ||
1719 | ||
1720 | // data_mtx should be held when called; overrides member | |
1721 | // function in base class to add check for whether a request can | |
1722 | // be pushed to the server | |
1723 | typename super::NextReq next_request(Time now) { | |
1724 | if (!can_handle_f()) { | |
1725 | typename super::NextReq result; | |
1726 | result.type = super::NextReqType::none; | |
1727 | return result; | |
1728 | } else { | |
1729 | return super::do_next_request(now); | |
1730 | } | |
1731 | } // next_request | |
1732 | ||
1733 | ||
1734 | // data_mtx should be held when called | |
1735 | void schedule_request() { | |
1736 | typename super::NextReq next_req = next_request(); | |
1737 | switch (next_req.type) { | |
1738 | case super::NextReqType::none: | |
1739 | return; | |
1740 | case super::NextReqType::future: | |
1741 | sched_at(next_req.when_ready); | |
1742 | break; | |
1743 | case super::NextReqType::returning: | |
1744 | submit_request(next_req.heap_id); | |
1745 | break; | |
1746 | default: | |
1747 | assert(false); | |
1748 | } | |
1749 | } | |
1750 | ||
1751 | ||
1752 | // this is the thread that handles running schedule_request at | |
1753 | // future times when nothing can be scheduled immediately | |
1754 | void run_sched_ahead() { | |
1755 | std::unique_lock<std::mutex> l(sched_ahead_mtx); | |
1756 | ||
1757 | while (!this->finishing) { | |
1758 | if (TimeZero == sched_ahead_when) { | |
1759 | sched_ahead_cv.wait(l); | |
1760 | } else { | |
1761 | Time now; | |
1762 | while (!this->finishing && (now = get_time()) < sched_ahead_when) { | |
1763 | long microseconds_l = long(1 + 1000000 * (sched_ahead_when - now)); | |
1764 | auto microseconds = std::chrono::microseconds(microseconds_l); | |
1765 | sched_ahead_cv.wait_for(l, microseconds); | |
1766 | } | |
1767 | sched_ahead_when = TimeZero; | |
1768 | if (this->finishing) return; | |
1769 | ||
1770 | l.unlock(); | |
1771 | if (!this->finishing) { | |
1772 | typename super::DataGuard g(this->data_mtx); | |
1773 | schedule_request(); | |
1774 | } | |
1775 | l.lock(); | |
1776 | } | |
1777 | } | |
1778 | } | |
1779 | ||
1780 | ||
1781 | void sched_at(Time when) { | |
1782 | std::lock_guard<std::mutex> l(sched_ahead_mtx); | |
31f18b77 | 1783 | if (this->finishing) return; |
7c673cae FG |
1784 | if (TimeZero == sched_ahead_when || when < sched_ahead_when) { |
1785 | sched_ahead_when = when; | |
1786 | sched_ahead_cv.notify_one(); | |
1787 | } | |
1788 | } | |
1789 | }; // class PushPriorityQueue | |
1790 | ||
1791 | } // namespace dmclock | |
1792 | } // namespace crimson |