]> git.proxmox.com Git - ceph.git/blob - ceph/src/dmclock/src/dmclock_server.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / dmclock / src / dmclock_server.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /*
5 * Copyright (C) 2017 Red Hat Inc.
6 *
7 * Author: J. Eric Ivancich <ivancich@redhat.com>
8 *
9 * This is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU Lesser General Public License version
11 * 2.1, as published by the Free Software Foundation. See file
12 * COPYING.
13 */
14
15
16 #pragma once
17
18 /* COMPILATION OPTIONS
19 *
20 * The prop_heap does not seem to be necessary. The only thing it
21 * would help with is quickly finding the minimum proportion/prioity
22 * when an idle client became active. To have the code maintain the
23 * proportional heap, define USE_PROP_HEAP (i.e., compiler argument
24 * -DUSE_PROP_HEAP).
25 */
26
27 #include <assert.h>
28
29 #include <cmath>
30 #include <memory>
31 #include <map>
32 #include <deque>
33 #include <queue>
34 #include <atomic>
35 #include <mutex>
36 #include <condition_variable>
37 #include <thread>
38 #include <iostream>
39 #include <sstream>
40 #include <limits>
41
42 #include <boost/variant.hpp>
43
44 #include "indirect_intrusive_heap.h"
45 #include "../support/src/run_every.h"
46 #include "dmclock_util.h"
47 #include "dmclock_recs.h"
48
49 #ifdef PROFILE
50 #include "profile.h"
51 #endif
52
53
54 namespace crimson {
55
56 namespace dmclock {
57
58 namespace c = crimson;
59
60 constexpr double max_tag = std::numeric_limits<double>::is_iec559 ?
61 std::numeric_limits<double>::infinity() :
62 std::numeric_limits<double>::max();
63 constexpr double min_tag = std::numeric_limits<double>::is_iec559 ?
64 -std::numeric_limits<double>::infinity() :
65 std::numeric_limits<double>::lowest();
66 constexpr unsigned tag_modulo = 1000000;
67
68 constexpr auto standard_idle_age = std::chrono::seconds(300);
69 constexpr auto standard_erase_age = std::chrono::seconds(600);
70 constexpr auto standard_check_time = std::chrono::seconds(60);
71 constexpr auto aggressive_check_time = std::chrono::seconds(5);
72 constexpr unsigned standard_erase_max = 2000;
73
74 enum class AtLimit {
75 // requests are delayed until the limit is restored
76 Wait,
77 // requests are allowed to exceed their limit, if all other reservations
78 // are met and below their limits
79 Allow,
80 // if an incoming request would exceed its limit, add_request() will
81 // reject it with EAGAIN instead of adding it to the queue. cannot be used
82 // with DelayedTagCalc, because add_request() needs an accurate tag
83 Reject,
84 };
85
86 // when AtLimit::Reject is used, only start rejecting requests once their
87 // limit is above this threshold. requests under this threshold are
88 // enqueued and processed like AtLimit::Wait
89 using RejectThreshold = Time;
90
91 // the AtLimit constructor parameter can either accept AtLimit or a value
92 // for RejectThreshold (which implies AtLimit::Reject)
93 using AtLimitParam = boost::variant<AtLimit, RejectThreshold>;
94
95 struct ClientInfo {
96 double reservation; // minimum
97 double weight; // proportional
98 double limit; // maximum
99
100 // multiplicative inverses of above, which we use in calculations
101 // and don't want to recalculate repeatedly
102 double reservation_inv;
103 double weight_inv;
104 double limit_inv;
105
106 // order parameters -- min, "normal", max
107 ClientInfo(double _reservation, double _weight, double _limit) {
108 update(_reservation, _weight, _limit);
109 }
110
111 inline void update(double _reservation, double _weight, double _limit) {
112 reservation = _reservation;
113 weight = _weight;
114 limit = _limit;
115 reservation_inv = (0.0 == reservation) ? 0.0 : 1.0 / reservation;
116 weight_inv = (0.0 == weight) ? 0.0 : 1.0 / weight;
117 limit_inv = (0.0 == limit) ? 0.0 : 1.0 / limit;
118 }
119
120 friend std::ostream& operator<<(std::ostream& out,
121 const ClientInfo& client) {
122 out <<
123 "{ ClientInfo:: r:" << client.reservation <<
124 " w:" << std::fixed << client.weight <<
125 " l:" << std::fixed << client.limit <<
126 " 1/r:" << std::fixed << client.reservation_inv <<
127 " 1/w:" << std::fixed << client.weight_inv <<
128 " 1/l:" << std::fixed << client.limit_inv <<
129 " }";
130 return out;
131 }
132 }; // class ClientInfo
133
134
135 struct RequestTag {
136 double reservation;
137 double proportion;
138 double limit;
139 uint32_t delta;
140 uint32_t rho;
141 Cost cost;
142 bool ready; // true when within limit
143 Time arrival;
144
145 RequestTag(const RequestTag& prev_tag,
146 const ClientInfo& client,
147 const uint32_t _delta,
148 const uint32_t _rho,
149 const Time time,
150 const Cost _cost = 1u,
151 const double anticipation_timeout = 0.0) :
152 delta(_delta),
153 rho(_rho),
154 cost(_cost),
155 ready(false),
156 arrival(time)
157 {
158 assert(cost > 0);
159 Time max_time = time;
160 if (time - anticipation_timeout < prev_tag.arrival)
161 max_time -= anticipation_timeout;
162
163 reservation = tag_calc(max_time,
164 prev_tag.reservation,
165 client.reservation_inv,
166 rho,
167 true,
168 cost);
169 proportion = tag_calc(max_time,
170 prev_tag.proportion,
171 client.weight_inv,
172 delta,
173 true,
174 cost);
175 limit = tag_calc(max_time,
176 prev_tag.limit,
177 client.limit_inv,
178 delta,
179 false,
180 cost);
181
182 assert(reservation < max_tag || proportion < max_tag);
183 }
184
185 RequestTag(const RequestTag& prev_tag,
186 const ClientInfo& client,
187 const ReqParams req_params,
188 const Time time,
189 const Cost cost = 1u,
190 const double anticipation_timeout = 0.0) :
191 RequestTag(prev_tag, client, req_params.delta, req_params.rho, time,
192 cost, anticipation_timeout)
193 { /* empty */ }
194
195 RequestTag(const double _res, const double _prop, const double _lim,
196 const Time _arrival,
197 const uint32_t _delta = 0,
198 const uint32_t _rho = 0,
199 const Cost _cost = 1u) :
200 reservation(_res),
201 proportion(_prop),
202 limit(_lim),
203 delta(_delta),
204 rho(_rho),
205 cost(_cost),
206 ready(false),
207 arrival(_arrival)
208 {
209 assert(cost > 0);
210 assert(reservation < max_tag || proportion < max_tag);
211 }
212
213 RequestTag(const RequestTag& other) :
214 reservation(other.reservation),
215 proportion(other.proportion),
216 limit(other.limit),
217 delta(other.delta),
218 rho(other.rho),
219 cost(other.cost),
220 ready(other.ready),
221 arrival(other.arrival)
222 { /* empty */ }
223
224 static std::string format_tag_change(double before, double after) {
225 if (before == after) {
226 return std::string("same");
227 } else {
228 std::stringstream ss;
229 ss << format_tag(before) << "=>" << format_tag(after);
230 return ss.str();
231 }
232 }
233
234 static std::string format_tag(double value) {
235 if (max_tag == value) {
236 return std::string("max");
237 } else if (min_tag == value) {
238 return std::string("min");
239 } else {
240 return format_time(value, tag_modulo);
241 }
242 }
243
244 private:
245
246 static double tag_calc(const Time time,
247 const double prev,
248 const double increment,
249 const uint32_t dist_req_val,
250 const bool extreme_is_high,
251 const Cost cost) {
252 if (0.0 == increment) {
253 return extreme_is_high ? max_tag : min_tag;
254 } else {
255 // insure 64-bit arithmetic before conversion to double
256 double tag_increment = increment * (uint64_t(dist_req_val) + cost);
257 return std::max(time, prev + tag_increment);
258 }
259 }
260
261 friend std::ostream& operator<<(std::ostream& out,
262 const RequestTag& tag) {
263 out <<
264 "{ RequestTag:: ready:" << (tag.ready ? "true" : "false") <<
265 " r:" << format_tag(tag.reservation) <<
266 " p:" << format_tag(tag.proportion) <<
267 " l:" << format_tag(tag.limit) <<
268 #if 0 // try to resolve this to make sure Time is operator<<'able.
269 " arrival:" << tag.arrival <<
270 #endif
271 " }";
272 return out;
273 }
274 }; // class RequestTag
275
276 // C is client identifier type, R is request type,
277 // IsDelayed controls whether tag calculation is delayed until the request
278 // reaches the front of its queue. This is an optimization over the
279 // originally published dmclock algorithm, allowing it to use the most
280 // recent values of rho and delta.
281 // U1 determines whether to use client information function dynamically,
282 // B is heap branching factor
283 template<typename C, typename R, bool IsDelayed, bool U1, unsigned B>
284 class PriorityQueueBase {
285 // we don't want to include gtest.h just for FRIEND_TEST
286 friend class dmclock_server_client_idle_erase_Test;
287
288 // types used for tag dispatch to select between implementations
289 using TagCalc = std::integral_constant<bool, IsDelayed>;
290 using DelayedTagCalc = std::true_type;
291 using ImmediateTagCalc = std::false_type;
292
293 public:
294
295 using RequestRef = std::unique_ptr<R>;
296
297 protected:
298
299 using TimePoint = decltype(std::chrono::steady_clock::now());
300 using Duration = std::chrono::milliseconds;
301 using MarkPoint = std::pair<TimePoint,Counter>;
302
303 enum class ReadyOption {ignore, lowers, raises};
304
305 // forward decl for friend decls
306 template<double RequestTag::*, ReadyOption, bool>
307 struct ClientCompare;
308
309 class ClientReq {
310 friend PriorityQueueBase;
311
312 RequestTag tag;
313 C client_id;
314 RequestRef request;
315
316 public:
317
318 ClientReq(const RequestTag& _tag,
319 const C& _client_id,
320 RequestRef&& _request) :
321 tag(_tag),
322 client_id(_client_id),
323 request(std::move(_request))
324 {
325 // empty
326 }
327
328 friend std::ostream& operator<<(std::ostream& out, const ClientReq& c) {
329 out << "{ ClientReq:: tag:" << c.tag << " client:" <<
330 c.client_id << " }";
331 return out;
332 }
333 }; // class ClientReq
334
335 struct RequestMeta {
336 C client_id;
337 RequestTag tag;
338
339 RequestMeta(const C& _client_id, const RequestTag& _tag) :
340 client_id(_client_id),
341 tag(_tag)
342 {
343 // empty
344 }
345 };
346
347 public:
348
349 // NOTE: ClientRec is in the "public" section for compatibility
350 // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1
351 // ClientRec could be "protected" with no issue. [See comments
352 // associated with function submit_top_request.]
353 class ClientRec {
354 friend PriorityQueueBase<C,R,IsDelayed,U1,B>;
355
356 C client;
357 RequestTag prev_tag;
358 std::deque<ClientReq> requests;
359
360 // amount added from the proportion tag as a result of
361 // an idle client becoming unidle
362 double prop_delta = 0.0;
363
364 c::IndIntruHeapData reserv_heap_data {};
365 c::IndIntruHeapData lim_heap_data {};
366 c::IndIntruHeapData ready_heap_data {};
367 #if USE_PROP_HEAP
368 c::IndIntruHeapData prop_heap_data {};
369 #endif
370
371 public:
372
373 const ClientInfo* info;
374 bool idle;
375 Counter last_tick;
376 uint32_t cur_rho;
377 uint32_t cur_delta;
378
379 ClientRec(C _client,
380 const ClientInfo* _info,
381 Counter current_tick) :
382 client(_client),
383 prev_tag(0.0, 0.0, 0.0, TimeZero),
384 info(_info),
385 idle(true),
386 last_tick(current_tick),
387 cur_rho(1),
388 cur_delta(1)
389 {
390 // empty
391 }
392
393 inline const RequestTag& get_req_tag() const {
394 return prev_tag;
395 }
396
397 static inline void assign_unpinned_tag(double& lhs, const double rhs) {
398 if (rhs != max_tag && rhs != min_tag) {
399 lhs = rhs;
400 }
401 }
402
403 inline void update_req_tag(const RequestTag& _prev,
404 const Counter& _tick) {
405 assign_unpinned_tag(prev_tag.reservation, _prev.reservation);
406 assign_unpinned_tag(prev_tag.limit, _prev.limit);
407 assign_unpinned_tag(prev_tag.proportion, _prev.proportion);
408 prev_tag.arrival = _prev.arrival;
409 last_tick = _tick;
410 }
411
412 inline void add_request(const RequestTag& tag, RequestRef&& request) {
413 requests.emplace_back(tag, client, std::move(request));
414 }
415
416 inline const ClientReq& next_request() const {
417 return requests.front();
418 }
419
420 inline ClientReq& next_request() {
421 return requests.front();
422 }
423
424 inline void pop_request() {
425 requests.pop_front();
426 }
427
428 inline bool has_request() const {
429 return !requests.empty();
430 }
431
432 inline size_t request_count() const {
433 return requests.size();
434 }
435
436 // NB: because a deque is the underlying structure, this
437 // operation might be expensive
438 bool remove_by_req_filter_fw(std::function<bool(RequestRef&&)> filter_accum) {
439 bool any_removed = false;
440 for (auto i = requests.begin();
441 i != requests.end();
442 /* no inc */) {
443 if (filter_accum(std::move(i->request))) {
444 any_removed = true;
445 i = requests.erase(i);
446 } else {
447 ++i;
448 }
449 }
450 return any_removed;
451 }
452
453 // NB: because a deque is the underlying structure, this
454 // operation might be expensive
455 bool remove_by_req_filter_bw(std::function<bool(RequestRef&&)> filter_accum) {
456 bool any_removed = false;
457 for (auto i = requests.rbegin();
458 i != requests.rend();
459 /* no inc */) {
460 if (filter_accum(std::move(i->request))) {
461 any_removed = true;
462 i = decltype(i){ requests.erase(std::next(i).base()) };
463 } else {
464 ++i;
465 }
466 }
467 return any_removed;
468 }
469
470 inline bool
471 remove_by_req_filter(std::function<bool(RequestRef&&)> filter_accum,
472 bool visit_backwards) {
473 if (visit_backwards) {
474 return remove_by_req_filter_bw(filter_accum);
475 } else {
476 return remove_by_req_filter_fw(filter_accum);
477 }
478 }
479
480 friend std::ostream&
481 operator<<(std::ostream& out,
482 const typename PriorityQueueBase::ClientRec& e) {
483 out << "{ ClientRec::" <<
484 " client:" << e.client <<
485 " prev_tag:" << e.prev_tag <<
486 " req_count:" << e.requests.size() <<
487 " top_req:";
488 if (e.has_request()) {
489 out << e.next_request();
490 } else {
491 out << "none";
492 }
493 out << " }";
494
495 return out;
496 }
497 }; // class ClientRec
498
499 using ClientRecRef = std::shared_ptr<ClientRec>;
500
501 // when we try to get the next request, we'll be in one of three
502 // situations -- we'll have one to return, have one that can
503 // fire in the future, or not have any
504 enum class NextReqType { returning, future, none };
505
506 // specifies which queue next request will get popped from
507 enum class HeapId { reservation, ready };
508
509 // this is returned from next_req to tell the caller the situation
510 struct NextReq {
511 NextReqType type;
512 union {
513 HeapId heap_id;
514 Time when_ready;
515 };
516
517 inline explicit NextReq() :
518 type(NextReqType::none)
519 { }
520
521 inline NextReq(HeapId _heap_id) :
522 type(NextReqType::returning),
523 heap_id(_heap_id)
524 { }
525
526 inline NextReq(Time _when_ready) :
527 type(NextReqType::future),
528 when_ready(_when_ready)
529 { }
530
531 // calls to this are clearer than calls to the default
532 // constructor
533 static inline NextReq none() {
534 return NextReq();
535 }
536 };
537
538
539 // a function that can be called to look up client information
540 using ClientInfoFunc = std::function<const ClientInfo*(const C&)>;
541
542
543 bool empty() const {
544 DataGuard g(data_mtx);
545 return (resv_heap.empty() || ! resv_heap.top().has_request());
546 }
547
548
549 size_t client_count() const {
550 DataGuard g(data_mtx);
551 return resv_heap.size();
552 }
553
554
555 size_t request_count() const {
556 DataGuard g(data_mtx);
557 size_t total = 0;
558 for (auto i = resv_heap.cbegin(); i != resv_heap.cend(); ++i) {
559 total += i->request_count();
560 }
561 return total;
562 }
563
564
565 bool remove_by_req_filter(std::function<bool(RequestRef&&)> filter_accum,
566 bool visit_backwards = false) {
567 bool any_removed = false;
568 DataGuard g(data_mtx);
569 for (auto i : client_map) {
570 bool modified =
571 i.second->remove_by_req_filter(filter_accum, visit_backwards);
572 if (modified) {
573 resv_heap.adjust(*i.second);
574 limit_heap.adjust(*i.second);
575 ready_heap.adjust(*i.second);
576 #if USE_PROP_HEAP
577 prop_heap.adjust(*i.second);
578 #endif
579 any_removed = true;
580 }
581 }
582 return any_removed;
583 }
584
585
586 // use as a default value when no accumulator is provide
587 static void request_sink(RequestRef&& req) {
588 // do nothing
589 }
590
591
592 void remove_by_client(const C& client,
593 bool reverse = false,
594 std::function<void (RequestRef&&)> accum = request_sink) {
595 DataGuard g(data_mtx);
596
597 auto i = client_map.find(client);
598
599 if (i == client_map.end()) return;
600
601 if (reverse) {
602 for (auto j = i->second->requests.rbegin();
603 j != i->second->requests.rend();
604 ++j) {
605 accum(std::move(j->request));
606 }
607 } else {
608 for (auto j = i->second->requests.begin();
609 j != i->second->requests.end();
610 ++j) {
611 accum(std::move(j->request));
612 }
613 }
614
615 i->second->requests.clear();
616
617 resv_heap.adjust(*i->second);
618 limit_heap.adjust(*i->second);
619 ready_heap.adjust(*i->second);
620 #if USE_PROP_HEAP
621 prop_heap.adjust(*i->second);
622 #endif
623 }
624
625
626 unsigned get_heap_branching_factor() const {
627 return B;
628 }
629
630
631 void update_client_info(const C& client_id) {
632 DataGuard g(data_mtx);
633 auto client_it = client_map.find(client_id);
634 if (client_map.end() != client_it) {
635 ClientRec& client = (*client_it->second);
636 client.info = client_info_f(client_id);
637 }
638 }
639
640
641 void update_client_infos() {
642 DataGuard g(data_mtx);
643 for (auto i : client_map) {
644 i.second->info = client_info_f(i.second->client);
645 }
646 }
647
648
649 friend std::ostream& operator<<(std::ostream& out,
650 const PriorityQueueBase& q) {
651 std::lock_guard<decltype(q.data_mtx)> guard(q.data_mtx);
652
653 out << "{ PriorityQueue::";
654 for (const auto& c : q.client_map) {
655 out << " { client:" << c.first << ", record:" << *c.second <<
656 " }";
657 }
658 if (!q.resv_heap.empty()) {
659 const auto& resv = q.resv_heap.top();
660 out << " { reservation_top:" << resv << " }";
661 const auto& ready = q.ready_heap.top();
662 out << " { ready_top:" << ready << " }";
663 const auto& limit = q.limit_heap.top();
664 out << " { limit_top:" << limit << " }";
665 } else {
666 out << " HEAPS-EMPTY";
667 }
668 out << " }";
669
670 return out;
671 }
672
673 // for debugging
674 void display_queues(std::ostream& out,
675 bool show_res = true,
676 bool show_lim = true,
677 bool show_ready = true,
678 bool show_prop = true) const {
679 auto filter = [](const ClientRec& e)->bool { return true; };
680 DataGuard g(data_mtx);
681 if (show_res) {
682 resv_heap.display_sorted(out << "RESER:", filter);
683 }
684 if (show_lim) {
685 limit_heap.display_sorted(out << "LIMIT:", filter);
686 }
687 if (show_ready) {
688 ready_heap.display_sorted(out << "READY:", filter);
689 }
690 #if USE_PROP_HEAP
691 if (show_prop) {
692 prop_heap.display_sorted(out << "PROPO:", filter);
693 }
694 #endif
695 } // display_queues
696
697
698 protected:
699
700 // The ClientCompare functor is essentially doing a precedes?
701 // operator, returning true if and only if the first parameter
702 // must precede the second parameter. If the second must precede
703 // the first, or if they are equivalent, false should be
704 // returned. The reason for this behavior is that it will be
705 // called to test if two items are out of order and if true is
706 // returned it will reverse the items. Therefore false is the
707 // default return when it doesn't matter to prevent unnecessary
708 // re-ordering.
709 //
710 // The template is supporting variations in sorting based on the
711 // heap in question and allowing these variations to be handled
712 // at compile-time.
713 //
714 // tag_field determines which tag is being used for comparison
715 //
716 // ready_opt determines how the ready flag influences the sort
717 //
718 // use_prop_delta determines whether the proportional delta is
719 // added in for comparison
720 template<double RequestTag::*tag_field,
721 ReadyOption ready_opt,
722 bool use_prop_delta>
723 struct ClientCompare {
724 bool operator()(const ClientRec& n1, const ClientRec& n2) const {
725 if (n1.has_request()) {
726 if (n2.has_request()) {
727 const auto& t1 = n1.next_request().tag;
728 const auto& t2 = n2.next_request().tag;
729 if (ReadyOption::ignore == ready_opt || t1.ready == t2.ready) {
730 // if we don't care about ready or the ready values are the same
731 if (use_prop_delta) {
732 return (t1.*tag_field + n1.prop_delta) <
733 (t2.*tag_field + n2.prop_delta);
734 } else {
735 return t1.*tag_field < t2.*tag_field;
736 }
737 } else if (ReadyOption::raises == ready_opt) {
738 // use_ready == true && the ready fields are different
739 return t1.ready;
740 } else {
741 return t2.ready;
742 }
743 } else {
744 // n1 has request but n2 does not
745 return true;
746 }
747 } else if (n2.has_request()) {
748 // n2 has request but n1 does not
749 return false;
750 } else {
751 // both have none; keep stable w false
752 return false;
753 }
754 }
755 };
756
757 ClientInfoFunc client_info_f;
758 static constexpr bool is_dynamic_cli_info_f = U1;
759
760 mutable std::mutex data_mtx;
761 using DataGuard = std::lock_guard<decltype(data_mtx)>;
762
763 // stable mapping between client ids and client queues
764 std::map<C,ClientRecRef> client_map;
765
766 c::IndIntruHeap<ClientRecRef,
767 ClientRec,
768 &ClientRec::reserv_heap_data,
769 ClientCompare<&RequestTag::reservation,
770 ReadyOption::ignore,
771 false>,
772 B> resv_heap;
773 #if USE_PROP_HEAP
774 c::IndIntruHeap<ClientRecRef,
775 ClientRec,
776 &ClientRec::prop_heap_data,
777 ClientCompare<&RequestTag::proportion,
778 ReadyOption::ignore,
779 true>,
780 B> prop_heap;
781 #endif
782 c::IndIntruHeap<ClientRecRef,
783 ClientRec,
784 &ClientRec::lim_heap_data,
785 ClientCompare<&RequestTag::limit,
786 ReadyOption::lowers,
787 false>,
788 B> limit_heap;
789 c::IndIntruHeap<ClientRecRef,
790 ClientRec,
791 &ClientRec::ready_heap_data,
792 ClientCompare<&RequestTag::proportion,
793 ReadyOption::raises,
794 true>,
795 B> ready_heap;
796
797 AtLimit at_limit;
798 RejectThreshold reject_threshold = 0;
799
800 double anticipation_timeout;
801
802 std::atomic_bool finishing;
803
804 // every request creates a tick
805 Counter tick = 0;
806
807 // performance data collection
808 size_t reserv_sched_count = 0;
809 size_t prop_sched_count = 0;
810 size_t limit_break_sched_count = 0;
811
812 Duration idle_age;
813 Duration erase_age;
814 Duration check_time;
815 std::deque<MarkPoint> clean_mark_points;
816 // max number of clients to erase at a time
817 Counter erase_max;
818 // unfinished last erase point
819 Counter last_erase_point = 0;
820
821 // NB: All threads declared at end, so they're destructed first!
822
823 std::unique_ptr<RunEvery> cleaning_job;
824
825 // helper function to return the value of a variant if it matches the
826 // given type T, or a default value of T otherwise
827 template <typename T, typename Variant>
828 static T get_or_default(const Variant& param, T default_value) {
829 const T *p = boost::get<T>(&param);
830 return p ? *p : default_value;
831 }
832
833 // COMMON constructor that others feed into; we can accept three
834 // different variations of durations
835 template<typename Rep, typename Per>
836 PriorityQueueBase(ClientInfoFunc _client_info_f,
837 std::chrono::duration<Rep,Per> _idle_age,
838 std::chrono::duration<Rep,Per> _erase_age,
839 std::chrono::duration<Rep,Per> _check_time,
840 AtLimitParam at_limit_param,
841 double _anticipation_timeout) :
842 client_info_f(_client_info_f),
843 at_limit(get_or_default(at_limit_param, AtLimit::Reject)),
844 reject_threshold(get_or_default(at_limit_param, RejectThreshold{0})),
845 anticipation_timeout(_anticipation_timeout),
846 finishing(false),
847 idle_age(std::chrono::duration_cast<Duration>(_idle_age)),
848 erase_age(std::chrono::duration_cast<Duration>(_erase_age)),
849 check_time(std::chrono::duration_cast<Duration>(_check_time)),
850 erase_max(standard_erase_max)
851 {
852 assert(_erase_age >= _idle_age);
853 assert(_check_time < _idle_age);
854 // AtLimit::Reject depends on ImmediateTagCalc
855 assert(at_limit != AtLimit::Reject || !IsDelayed);
856 cleaning_job =
857 std::unique_ptr<RunEvery>(
858 new RunEvery(check_time,
859 std::bind(&PriorityQueueBase::do_clean, this)));
860 }
861
862
863 ~PriorityQueueBase() {
864 finishing = true;
865 }
866
867
868 inline const ClientInfo* get_cli_info(ClientRec& client) const {
869 if (is_dynamic_cli_info_f) {
870 client.info = client_info_f(client.client);
871 }
872 return client.info;
873 }
874
875 // data_mtx must be held by caller
876 RequestTag initial_tag(DelayedTagCalc delayed, ClientRec& client,
877 const ReqParams& params, Time time, Cost cost) {
878 RequestTag tag(0, 0, 0, time, 0, 0, cost);
879
880 // only calculate a tag if the request is going straight to the front
881 if (!client.has_request()) {
882 const ClientInfo* client_info = get_cli_info(client);
883 assert(client_info);
884 tag = RequestTag(client.get_req_tag(), *client_info,
885 params, time, cost, anticipation_timeout);
886
887 // copy tag to previous tag for client
888 client.update_req_tag(tag, tick);
889 }
890 return tag;
891 }
892
893 // data_mtx must be held by caller
894 RequestTag initial_tag(ImmediateTagCalc imm, ClientRec& client,
895 const ReqParams& params, Time time, Cost cost) {
896 // calculate the tag unconditionally
897 const ClientInfo* client_info = get_cli_info(client);
898 assert(client_info);
899 RequestTag tag(client.get_req_tag(), *client_info,
900 params, time, cost, anticipation_timeout);
901
902 // copy tag to previous tag for client
903 client.update_req_tag(tag, tick);
904 return tag;
905 }
906
907 // data_mtx must be held by caller. returns 0 on success. when using
908 // AtLimit::Reject, requests that would exceed their limit are rejected
909 // with EAGAIN, and the queue will not take ownership of the given
910 // 'request' argument
911 int do_add_request(RequestRef&& request,
912 const C& client_id,
913 const ReqParams& req_params,
914 const Time time,
915 const Cost cost = 1u) {
916 ++tick;
917
918 auto insert = client_map.emplace(client_id, ClientRecRef{});
919 if (insert.second) {
920 // new client entry
921 const ClientInfo* info = client_info_f(client_id);
922 auto client_rec = std::make_shared<ClientRec>(client_id, info, tick);
923 resv_heap.push(client_rec);
924 #if USE_PROP_HEAP
925 prop_heap.push(client_rec);
926 #endif
927 limit_heap.push(client_rec);
928 ready_heap.push(client_rec);
929 insert.first->second = std::move(client_rec);
930 }
931
932 // for convenience, we'll create a reference to the shared pointer
933 ClientRec& client = *insert.first->second;
934
935 if (client.idle) {
936 // We need to do an adjustment so that idle clients compete
937 // fairly on proportional tags since those tags may have
938 // drifted from real-time. Either use the lowest existing
939 // proportion tag -- O(1) -- or the client with the lowest
940 // previous proportion tag -- O(n) where n = # clients.
941 //
942 // So we don't have to maintain a propotional queue that
943 // keeps the minimum on proportional tag alone (we're
944 // instead using a ready queue), we'll have to check each
945 // client.
946 //
947 // The alternative would be to maintain a proportional queue
948 // (define USE_PROP_TAG) and do an O(1) operation here.
949
950 // Was unable to confirm whether equality testing on
951 // std::numeric_limits<double>::max() is guaranteed, so
952 // we'll use a compile-time calculated trigger that is one
953 // third the max, which should be much larger than any
954 // expected organic value.
955 constexpr double lowest_prop_tag_trigger =
956 std::numeric_limits<double>::max() / 3.0;
957
958 double lowest_prop_tag = std::numeric_limits<double>::max();
959 for (auto const &c : client_map) {
960 // don't use ourselves (or anything else that might be
961 // listed as idle) since we're now in the map
962 if (!c.second->idle) {
963 double p;
964 // use either lowest proportion tag or previous proportion tag
965 if (c.second->has_request()) {
966 p = c.second->next_request().tag.proportion +
967 c.second->prop_delta;
968 } else {
969 p = c.second->get_req_tag().proportion + c.second->prop_delta;
970 }
971
972 if (p < lowest_prop_tag) {
973 lowest_prop_tag = p;
974 }
975 }
976 }
977
978 // if this conditional does not fire, it
979 if (lowest_prop_tag < lowest_prop_tag_trigger) {
980 client.prop_delta = lowest_prop_tag - time;
981 }
982 client.idle = false;
983 } // if this client was idle
984
985 RequestTag tag = initial_tag(TagCalc{}, client, req_params, time, cost);
986
987 if (at_limit == AtLimit::Reject &&
988 tag.limit > time + reject_threshold) {
989 // if the client is over its limit, reject it here
990 return EAGAIN;
991 }
992
993 client.add_request(tag, std::move(request));
994 if (1 == client.requests.size()) {
995 // NB: can the following 4 calls to adjust be changed
996 // promote? Can adding a request ever demote a client in the
997 // heaps?
998 resv_heap.adjust(client);
999 limit_heap.adjust(client);
1000 ready_heap.adjust(client);
1001 #if USE_PROP_HEAP
1002 prop_heap.adjust(client);
1003 #endif
1004 }
1005
1006 client.cur_rho = req_params.rho;
1007 client.cur_delta = req_params.delta;
1008
1009 resv_heap.adjust(client);
1010 limit_heap.adjust(client);
1011 ready_heap.adjust(client);
1012 #if USE_PROP_HEAP
1013 prop_heap.adjust(client);
1014 #endif
1015 return 0;
1016 } // do_add_request
1017
1018 // data_mtx must be held by caller
1019 void update_next_tag(DelayedTagCalc delayed, ClientRec& top,
1020 const RequestTag& tag) {
1021 if (top.has_request()) {
1022 // perform delayed tag calculation on the next request
1023 ClientReq& next_first = top.next_request();
1024 const ClientInfo* client_info = get_cli_info(top);
1025 assert(client_info);
1026 next_first.tag = RequestTag(tag, *client_info,
1027 top.cur_delta, top.cur_rho,
1028 next_first.tag.arrival,
1029 next_first.tag.cost,
1030 anticipation_timeout);
1031 // copy tag to previous tag for client
1032 top.update_req_tag(next_first.tag, tick);
1033 }
1034 }
1035
1036 void update_next_tag(ImmediateTagCalc imm, ClientRec& top,
1037 const RequestTag& tag) {
1038 // the next tag was already calculated on insertion
1039 }
1040
1041 // data_mtx should be held when called; top of heap should have
1042 // a ready request
1043 template<typename C1, IndIntruHeapData ClientRec::*C2, typename C3>
1044 RequestTag pop_process_request(IndIntruHeap<C1, ClientRec, C2, C3, B>& heap,
1045 std::function<void(const C& client,
1046 const Cost cost,
1047 RequestRef& request)> process) {
1048 // gain access to data
1049 ClientRec& top = heap.top();
1050
1051 Cost request_cost = top.next_request().tag.cost;
1052 RequestRef request = std::move(top.next_request().request);
1053 RequestTag tag = top.next_request().tag;
1054
1055 // pop request and adjust heaps
1056 top.pop_request();
1057
1058 update_next_tag(TagCalc{}, top, tag);
1059
1060 resv_heap.demote(top);
1061 limit_heap.adjust(top);
1062 #if USE_PROP_HEAP
1063 prop_heap.demote(top);
1064 #endif
1065 ready_heap.demote(top);
1066
1067 // process
1068 process(top.client, request_cost, request);
1069
1070 return tag;
1071 } // pop_process_request
1072
1073
1074 // data_mtx must be held by caller
1075 void reduce_reservation_tags(DelayedTagCalc delayed, ClientRec& client,
1076 const RequestTag& tag) {
1077 if (!client.requests.empty()) {
1078 // only maintain a tag for the first request
1079 auto& r = client.requests.front();
1080 r.tag.reservation -=
1081 client.info->reservation_inv * std::max(uint32_t(1), tag.rho);
1082 }
1083 }
1084
1085 // data_mtx should be held when called
1086 void reduce_reservation_tags(ImmediateTagCalc imm, ClientRec& client,
1087 const RequestTag& tag) {
1088 double res_offset =
1089 client.info->reservation_inv * std::max(uint32_t(1), tag.rho);
1090 for (auto& r : client.requests) {
1091 r.tag.reservation -= res_offset;
1092 }
1093 }
1094
1095 // data_mtx should be held when called
1096 void reduce_reservation_tags(const C& client_id, const RequestTag& tag) {
1097 auto client_it = client_map.find(client_id);
1098
1099 // means the client was cleaned from map; should never happen
1100 // as long as cleaning times are long enough
1101 assert(client_map.end() != client_it);
1102 ClientRec& client = *client_it->second;
1103 reduce_reservation_tags(TagCalc{}, client, tag);
1104
1105 // don't forget to update previous tag
1106 client.prev_tag.reservation -=
1107 client.info->reservation_inv * std::max(uint32_t(1), tag.rho);
1108 resv_heap.promote(client);
1109 }
1110
1111
1112 // data_mtx should be held when called
1113 NextReq do_next_request(Time now) {
1114 // if reservation queue is empty, all are empty (i.e., no
1115 // active clients)
1116 if(resv_heap.empty()) {
1117 return NextReq::none();
1118 }
1119
1120 // try constraint (reservation) based scheduling
1121
1122 auto& reserv = resv_heap.top();
1123 if (reserv.has_request() &&
1124 reserv.next_request().tag.reservation <= now) {
1125 return NextReq(HeapId::reservation);
1126 }
1127
1128 // no existing reservations before now, so try weight-based
1129 // scheduling
1130
1131 // all items that are within limit are eligible based on
1132 // priority
1133 auto limits = &limit_heap.top();
1134 while (limits->has_request() &&
1135 !limits->next_request().tag.ready &&
1136 limits->next_request().tag.limit <= now) {
1137 limits->next_request().tag.ready = true;
1138 ready_heap.promote(*limits);
1139 limit_heap.demote(*limits);
1140
1141 limits = &limit_heap.top();
1142 }
1143
1144 auto& readys = ready_heap.top();
1145 if (readys.has_request() &&
1146 readys.next_request().tag.ready &&
1147 readys.next_request().tag.proportion < max_tag) {
1148 return NextReq(HeapId::ready);
1149 }
1150
1151 // if nothing is schedulable by reservation or
1152 // proportion/weight, and if we allow limit break, try to
1153 // schedule something with the lowest proportion tag or
1154 // alternatively lowest reservation tag.
1155 if (at_limit == AtLimit::Allow) {
1156 if (readys.has_request() &&
1157 readys.next_request().tag.proportion < max_tag) {
1158 return NextReq(HeapId::ready);
1159 } else if (reserv.has_request() &&
1160 reserv.next_request().tag.reservation < max_tag) {
1161 return NextReq(HeapId::reservation);
1162 }
1163 }
1164
1165 // nothing scheduled; make sure we re-run when next
1166 // reservation item or next limited item comes up
1167
1168 Time next_call = TimeMax;
1169 if (resv_heap.top().has_request()) {
1170 next_call =
1171 min_not_0_time(next_call,
1172 resv_heap.top().next_request().tag.reservation);
1173 }
1174 if (limit_heap.top().has_request()) {
1175 const auto& next = limit_heap.top().next_request();
1176 assert(!next.tag.ready || max_tag == next.tag.proportion);
1177 next_call = min_not_0_time(next_call, next.tag.limit);
1178 }
1179 if (next_call < TimeMax) {
1180 return NextReq(next_call);
1181 } else {
1182 return NextReq::none();
1183 }
1184 } // do_next_request
1185
1186
1187 // if possible is not zero and less than current then return it;
1188 // otherwise return current; the idea is we're trying to find
1189 // the minimal time but ignoring zero
1190 static inline const Time& min_not_0_time(const Time& current,
1191 const Time& possible) {
1192 return TimeZero == possible ? current : std::min(current, possible);
1193 }
1194
1195
1196 /*
1197 * This is being called regularly by RunEvery. Every time it's
1198 * called it notes the time and delta counter (mark point) in a
1199 * deque. It also looks at the deque to find the most recent
1200 * mark point that is older than clean_age. It then walks the
1201 * map and delete all server entries that were last used before
1202 * that mark point.
1203 */
1204 void do_clean() {
1205 TimePoint now = std::chrono::steady_clock::now();
1206 DataGuard g(data_mtx);
1207 clean_mark_points.emplace_back(MarkPoint(now, tick));
1208
1209 // first erase the super-old client records
1210
1211 Counter erase_point = last_erase_point;
1212 auto point = clean_mark_points.front();
1213 while (point.first <= now - erase_age) {
1214 last_erase_point = point.second;
1215 erase_point = last_erase_point;
1216 clean_mark_points.pop_front();
1217 point = clean_mark_points.front();
1218 }
1219
1220 Counter idle_point = 0;
1221 for (auto i : clean_mark_points) {
1222 if (i.first <= now - idle_age) {
1223 idle_point = i.second;
1224 } else {
1225 break;
1226 }
1227 }
1228
1229 Counter erased_num = 0;
1230 if (erase_point > 0 || idle_point > 0) {
1231 for (auto i = client_map.begin(); i != client_map.end(); /* empty */) {
1232 auto i2 = i++;
1233 if (erase_point &&
1234 erased_num < erase_max &&
1235 i2->second->last_tick <= erase_point) {
1236 delete_from_heaps(i2->second);
1237 client_map.erase(i2);
1238 erased_num++;
1239 } else if (idle_point && i2->second->last_tick <= idle_point) {
1240 i2->second->idle = true;
1241 }
1242 } // for
1243
1244 auto wperiod = check_time;
1245 if (erased_num >= erase_max) {
1246 wperiod = duration_cast<milliseconds>(aggressive_check_time);
1247 } else {
1248 // clean finished, refresh
1249 last_erase_point = 0;
1250 }
1251 cleaning_job->try_update(wperiod);
1252 } // if
1253 } // do_clean
1254
1255
1256 // data_mtx must be held by caller
1257 template<IndIntruHeapData ClientRec::*C1,typename C2>
1258 void delete_from_heap(ClientRecRef& client,
1259 c::IndIntruHeap<ClientRecRef,ClientRec,C1,C2,B>& heap) {
1260 auto i = heap.at(client);
1261 heap.remove(i);
1262 }
1263
1264
1265 // data_mtx must be held by caller
1266 void delete_from_heaps(ClientRecRef& client) {
1267 delete_from_heap(client, resv_heap);
1268 #if USE_PROP_HEAP
1269 delete_from_heap(client, prop_heap);
1270 #endif
1271 delete_from_heap(client, limit_heap);
1272 delete_from_heap(client, ready_heap);
1273 }
1274 }; // class PriorityQueueBase
1275
1276
1277 template<typename C, typename R, bool IsDelayed=false, bool U1=false, unsigned B=2>
1278 class PullPriorityQueue : public PriorityQueueBase<C,R,IsDelayed,U1,B> {
1279 using super = PriorityQueueBase<C,R,IsDelayed,U1,B>;
1280
1281 public:
1282
1283 // When a request is pulled, this is the return type.
1284 struct PullReq {
1285 struct Retn {
1286 C client;
1287 typename super::RequestRef request;
1288 PhaseType phase;
1289 Cost cost;
1290 };
1291
1292 typename super::NextReqType type;
1293 boost::variant<Retn,Time> data;
1294
1295 bool is_none() const { return type == super::NextReqType::none; }
1296
1297 bool is_retn() const { return type == super::NextReqType::returning; }
1298 Retn& get_retn() {
1299 return boost::get<Retn>(data);
1300 }
1301
1302 bool is_future() const { return type == super::NextReqType::future; }
1303 Time getTime() const { return boost::get<Time>(data); }
1304 };
1305
1306
1307 #ifdef PROFILE
1308 ProfileTimer<std::chrono::nanoseconds> pull_request_timer;
1309 ProfileTimer<std::chrono::nanoseconds> add_request_timer;
1310 #endif
1311
1312 template<typename Rep, typename Per>
1313 PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1314 std::chrono::duration<Rep,Per> _idle_age,
1315 std::chrono::duration<Rep,Per> _erase_age,
1316 std::chrono::duration<Rep,Per> _check_time,
1317 AtLimitParam at_limit_param = AtLimit::Wait,
1318 double _anticipation_timeout = 0.0) :
1319 super(_client_info_f,
1320 _idle_age, _erase_age, _check_time,
1321 at_limit_param, _anticipation_timeout)
1322 {
1323 // empty
1324 }
1325
1326
1327 // pull convenience constructor
1328 PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1329 AtLimitParam at_limit_param = AtLimit::Wait,
1330 double _anticipation_timeout = 0.0) :
1331 PullPriorityQueue(_client_info_f,
1332 standard_idle_age,
1333 standard_erase_age,
1334 standard_check_time,
1335 at_limit_param,
1336 _anticipation_timeout)
1337 {
1338 // empty
1339 }
1340
1341
1342 int add_request(R&& request,
1343 const C& client_id,
1344 const ReqParams& req_params,
1345 const Cost cost = 1u) {
1346 return add_request(typename super::RequestRef(new R(std::move(request))),
1347 client_id,
1348 req_params,
1349 get_time(),
1350 cost);
1351 }
1352
1353
1354 int add_request(R&& request,
1355 const C& client_id,
1356 const Cost cost = 1u) {
1357 static const ReqParams null_req_params;
1358 return add_request(typename super::RequestRef(new R(std::move(request))),
1359 client_id,
1360 null_req_params,
1361 get_time(),
1362 cost);
1363 }
1364
1365
1366 int add_request_time(R&& request,
1367 const C& client_id,
1368 const ReqParams& req_params,
1369 const Time time,
1370 const Cost cost = 1u) {
1371 return add_request(typename super::RequestRef(new R(std::move(request))),
1372 client_id,
1373 req_params,
1374 time,
1375 cost);
1376 }
1377
1378
1379 int add_request(typename super::RequestRef&& request,
1380 const C& client_id,
1381 const ReqParams& req_params,
1382 const Cost cost = 1u) {
1383 return add_request(request, req_params, client_id, get_time(), cost);
1384 }
1385
1386
1387 int add_request(typename super::RequestRef&& request,
1388 const C& client_id,
1389 const Cost cost = 1u) {
1390 static const ReqParams null_req_params;
1391 return add_request(request, null_req_params, client_id, get_time(), cost);
1392 }
1393
1394
1395 // this does the work; the versions above provide alternate interfaces
1396 int add_request(typename super::RequestRef&& request,
1397 const C& client_id,
1398 const ReqParams& req_params,
1399 const Time time,
1400 const Cost cost = 1u) {
1401 typename super::DataGuard g(this->data_mtx);
1402 #ifdef PROFILE
1403 add_request_timer.start();
1404 #endif
1405 int r = super::do_add_request(std::move(request),
1406 client_id,
1407 req_params,
1408 time,
1409 cost);
1410 // no call to schedule_request for pull version
1411 #ifdef PROFILE
1412 add_request_timer.stop();
1413 #endif
1414 return r;
1415 }
1416
1417
1418 inline PullReq pull_request() {
1419 return pull_request(get_time());
1420 }
1421
1422
1423 PullReq pull_request(const Time now) {
1424 PullReq result;
1425 typename super::DataGuard g(this->data_mtx);
1426 #ifdef PROFILE
1427 pull_request_timer.start();
1428 #endif
1429
1430 typename super::NextReq next = super::do_next_request(now);
1431 result.type = next.type;
1432 switch(next.type) {
1433 case super::NextReqType::none:
1434 return result;
1435 case super::NextReqType::future:
1436 result.data = next.when_ready;
1437 return result;
1438 case super::NextReqType::returning:
1439 // to avoid nesting, break out and let code below handle this case
1440 break;
1441 default:
1442 assert(false);
1443 }
1444
1445 // we'll only get here if we're returning an entry
1446
1447 auto process_f =
1448 [&] (PullReq& pull_result, PhaseType phase) ->
1449 std::function<void(const C&,
1450 uint64_t,
1451 typename super::RequestRef&)> {
1452 return [&pull_result, phase](const C& client,
1453 const Cost request_cost,
1454 typename super::RequestRef& request) {
1455 pull_result.data = typename PullReq::Retn{ client,
1456 std::move(request),
1457 phase,
1458 request_cost };
1459 };
1460 };
1461
1462 switch(next.heap_id) {
1463 case super::HeapId::reservation:
1464 (void) super::pop_process_request(this->resv_heap,
1465 process_f(result,
1466 PhaseType::reservation));
1467 ++this->reserv_sched_count;
1468 break;
1469 case super::HeapId::ready:
1470 {
1471 auto tag = super::pop_process_request(this->ready_heap,
1472 process_f(result, PhaseType::priority));
1473 // need to use retn temporarily
1474 auto& retn = boost::get<typename PullReq::Retn>(result.data);
1475 super::reduce_reservation_tags(retn.client, tag);
1476 }
1477 ++this->prop_sched_count;
1478 break;
1479 default:
1480 assert(false);
1481 }
1482
1483 #ifdef PROFILE
1484 pull_request_timer.stop();
1485 #endif
1486 return result;
1487 } // pull_request
1488
1489
1490 protected:
1491
1492
1493 // data_mtx should be held when called; unfortunately this
1494 // function has to be repeated in both push & pull
1495 // specializations
1496 typename super::NextReq next_request() {
1497 return next_request(get_time());
1498 }
1499 }; // class PullPriorityQueue
1500
1501
1502 // PUSH version
1503 template<typename C, typename R, bool IsDelayed=false, bool U1=false, unsigned B=2>
1504 class PushPriorityQueue : public PriorityQueueBase<C,R,IsDelayed,U1,B> {
1505
1506 protected:
1507
1508 using super = PriorityQueueBase<C,R,IsDelayed,U1,B>;
1509
1510 public:
1511
1512 // a function to see whether the server can handle another request
1513 using CanHandleRequestFunc = std::function<bool(void)>;
1514
1515 // a function to submit a request to the server; the second
1516 // parameter is a callback when it's completed
1517 using HandleRequestFunc =
1518 std::function<void(const C&,typename super::RequestRef,PhaseType,uint64_t)>;
1519
1520 protected:
1521
1522 CanHandleRequestFunc can_handle_f;
1523 HandleRequestFunc handle_f;
1524 // for handling timed scheduling
1525 std::mutex sched_ahead_mtx;
1526 std::condition_variable sched_ahead_cv;
1527 Time sched_ahead_when = TimeZero;
1528
1529 #ifdef PROFILE
1530 public:
1531 ProfileTimer<std::chrono::nanoseconds> add_request_timer;
1532 ProfileTimer<std::chrono::nanoseconds> request_complete_timer;
1533 protected:
1534 #endif
1535
1536 // NB: threads declared last, so constructed last and destructed first
1537
1538 std::thread sched_ahead_thd;
1539
1540 public:
1541
1542 // push full constructor
1543 template<typename Rep, typename Per>
1544 PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1545 CanHandleRequestFunc _can_handle_f,
1546 HandleRequestFunc _handle_f,
1547 std::chrono::duration<Rep,Per> _idle_age,
1548 std::chrono::duration<Rep,Per> _erase_age,
1549 std::chrono::duration<Rep,Per> _check_time,
1550 AtLimitParam at_limit_param = AtLimit::Wait,
1551 double anticipation_timeout = 0.0) :
1552 super(_client_info_f,
1553 _idle_age, _erase_age, _check_time,
1554 at_limit_param, anticipation_timeout)
1555 {
1556 can_handle_f = _can_handle_f;
1557 handle_f = _handle_f;
1558 sched_ahead_thd = std::thread(&PushPriorityQueue::run_sched_ahead, this);
1559 }
1560
1561
1562 // push convenience constructor
1563 PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1564 CanHandleRequestFunc _can_handle_f,
1565 HandleRequestFunc _handle_f,
1566 AtLimitParam at_limit_param = AtLimit::Wait,
1567 double _anticipation_timeout = 0.0) :
1568 PushPriorityQueue(_client_info_f,
1569 _can_handle_f,
1570 _handle_f,
1571 standard_idle_age,
1572 standard_erase_age,
1573 standard_check_time,
1574 at_limit_param,
1575 _anticipation_timeout)
1576 {
1577 // empty
1578 }
1579
1580
1581 ~PushPriorityQueue() {
1582 this->finishing = true;
1583 sched_ahead_cv.notify_one();
1584 sched_ahead_thd.join();
1585 }
1586
1587 public:
1588
1589 int add_request(R&& request,
1590 const C& client_id,
1591 const ReqParams& req_params,
1592 const Cost cost = 1u) {
1593 return add_request(typename super::RequestRef(new R(std::move(request))),
1594 client_id,
1595 req_params,
1596 get_time(),
1597 cost);
1598 }
1599
1600
1601 int add_request(typename super::RequestRef&& request,
1602 const C& client_id,
1603 const ReqParams& req_params,
1604 const Cost cost = 1u) {
1605 return add_request(request, req_params, client_id, get_time(), cost);
1606 }
1607
1608
1609 int add_request_time(const R& request,
1610 const C& client_id,
1611 const ReqParams& req_params,
1612 const Time time,
1613 const Cost cost = 1u) {
1614 return add_request(typename super::RequestRef(new R(request)),
1615 client_id,
1616 req_params,
1617 time,
1618 cost);
1619 }
1620
1621
1622 int add_request(typename super::RequestRef&& request,
1623 const C& client_id,
1624 const ReqParams& req_params,
1625 const Time time,
1626 const Cost cost = 1u) {
1627 typename super::DataGuard g(this->data_mtx);
1628 #ifdef PROFILE
1629 add_request_timer.start();
1630 #endif
1631 int r = super::do_add_request(std::move(request),
1632 client_id,
1633 req_params,
1634 time,
1635 cost);
1636 if (r == 0) {
1637 schedule_request();
1638 }
1639 #ifdef PROFILE
1640 add_request_timer.stop();
1641 #endif
1642 return r;
1643 }
1644
1645
1646 void request_completed() {
1647 typename super::DataGuard g(this->data_mtx);
1648 #ifdef PROFILE
1649 request_complete_timer.start();
1650 #endif
1651 schedule_request();
1652 #ifdef PROFILE
1653 request_complete_timer.stop();
1654 #endif
1655 }
1656
1657 protected:
1658
1659 // data_mtx should be held when called; furthermore, the heap
1660 // should not be empty and the top element of the heap should
1661 // not be already handled
1662 //
1663 // NOTE: the use of "super::ClientRec" in either the template
1664 // construct or as a parameter to submit_top_request generated
1665 // a compiler error in g++ 4.8.4, when ClientRec was
1666 // "protected" rather than "public". By g++ 6.3.1 this was not
1667 // an issue. But for backwards compatibility
1668 // PriorityQueueBase::ClientRec is public.
1669 template<typename C1,
1670 IndIntruHeapData super::ClientRec::*C2,
1671 typename C3,
1672 unsigned B4>
1673 typename super::RequestMeta
1674 submit_top_request(IndIntruHeap<C1,typename super::ClientRec,C2,C3,B4>& heap,
1675 PhaseType phase) {
1676 C client_result;
1677 RequestTag tag = super::pop_process_request(heap,
1678 [this, phase, &client_result]
1679 (const C& client,
1680 const Cost request_cost,
1681 typename super::RequestRef& request) {
1682 client_result = client;
1683 handle_f(client, std::move(request), phase, request_cost);
1684 });
1685 typename super::RequestMeta req(client_result, tag);
1686 return req;
1687 }
1688
1689
1690 // data_mtx should be held when called
1691 void submit_request(typename super::HeapId heap_id) {
1692 switch(heap_id) {
1693 case super::HeapId::reservation:
1694 // don't need to note client
1695 (void) submit_top_request(this->resv_heap, PhaseType::reservation);
1696 // unlike the other two cases, we do not reduce reservation
1697 // tags here
1698 ++this->reserv_sched_count;
1699 break;
1700 case super::HeapId::ready:
1701 {
1702 auto req = submit_top_request(this->ready_heap, PhaseType::priority);
1703 super::reduce_reservation_tags(req.client_id, req.tag);
1704 }
1705 ++this->prop_sched_count;
1706 break;
1707 default:
1708 assert(false);
1709 }
1710 } // submit_request
1711
1712
1713 // data_mtx should be held when called; unfortunately this
1714 // function has to be repeated in both push & pull
1715 // specializations
1716 typename super::NextReq next_request() {
1717 return next_request(get_time());
1718 }
1719
1720
1721 // data_mtx should be held when called; overrides member
1722 // function in base class to add check for whether a request can
1723 // be pushed to the server
1724 typename super::NextReq next_request(Time now) {
1725 if (!can_handle_f()) {
1726 typename super::NextReq result;
1727 result.type = super::NextReqType::none;
1728 return result;
1729 } else {
1730 return super::do_next_request(now);
1731 }
1732 } // next_request
1733
1734
1735 // data_mtx should be held when called
1736 void schedule_request() {
1737 typename super::NextReq next_req = next_request();
1738 switch (next_req.type) {
1739 case super::NextReqType::none:
1740 return;
1741 case super::NextReqType::future:
1742 sched_at(next_req.when_ready);
1743 break;
1744 case super::NextReqType::returning:
1745 submit_request(next_req.heap_id);
1746 break;
1747 default:
1748 assert(false);
1749 }
1750 }
1751
1752
1753 // this is the thread that handles running schedule_request at
1754 // future times when nothing can be scheduled immediately
1755 void run_sched_ahead() {
1756 std::unique_lock<std::mutex> l(sched_ahead_mtx);
1757
1758 while (!this->finishing) {
1759 if (TimeZero == sched_ahead_when) {
1760 sched_ahead_cv.wait(l);
1761 } else {
1762 Time now;
1763 while (!this->finishing && (now = get_time()) < sched_ahead_when) {
1764 long microseconds_l = long(1 + 1000000 * (sched_ahead_when - now));
1765 auto microseconds = std::chrono::microseconds(microseconds_l);
1766 sched_ahead_cv.wait_for(l, microseconds);
1767 }
1768 sched_ahead_when = TimeZero;
1769 if (this->finishing) return;
1770
1771 l.unlock();
1772 if (!this->finishing) {
1773 typename super::DataGuard g(this->data_mtx);
1774 schedule_request();
1775 }
1776 l.lock();
1777 }
1778 }
1779 }
1780
1781
1782 void sched_at(Time when) {
1783 std::lock_guard<std::mutex> l(sched_ahead_mtx);
1784 if (this->finishing) return;
1785 if (TimeZero == sched_ahead_when || when < sched_ahead_when) {
1786 sched_ahead_when = when;
1787 sched_ahead_cv.notify_one();
1788 }
1789 }
1790 }; // class PushPriorityQueue
1791
1792 } // namespace dmclock
1793 } // namespace crimson