1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 * Copyright (C) 2017 Red Hat Inc.
11 /* COMPILATION OPTIONS
13 * By default we include an optimization over the originally published
14 * dmclock algorithm using not the values of rho and delta that were
15 * sent in with a request but instead the most recent rho and delta
16 * values from the requests's client. To restore the algorithm's
17 * original behavior, define DO_NOT_DELAY_TAG_CALC (i.e., compiler
18 * argument -DDO_NOT_DELAY_TAG_CALC).
20 * The prop_heap does not seem to be necessary. The only thing it
21 * would help with is quickly finding the mininum proportion/prioity
22 * when an idle client became active. To have the code maintain the
23 * proportional heap, define USE_PROP_HEAP (i.e., compiler argument
36 #include <condition_variable>
42 #include <boost/variant.hpp>
44 #include "indirect_intrusive_heap.h"
45 #include "run_every.h"
46 #include "dmclock_util.h"
47 #include "dmclock_recs.h"
58 namespace c
= crimson
;
60 constexpr double max_tag
= std::numeric_limits
<double>::is_iec559
?
61 std::numeric_limits
<double>::infinity() :
62 std::numeric_limits
<double>::max();
63 constexpr double min_tag
= std::numeric_limits
<double>::is_iec559
?
64 -std::numeric_limits
<double>::infinity() :
65 std::numeric_limits
<double>::lowest();
66 constexpr uint tag_modulo
= 1000000;
69 const double reservation
; // minimum
70 const double weight
; // proportional
71 const double limit
; // maximum
73 // multiplicative inverses of above, which we use in calculations
74 // and don't want to recalculate repeatedly
75 const double reservation_inv
;
76 const double weight_inv
;
77 const double limit_inv
;
79 // order parameters -- min, "normal", max
80 ClientInfo(double _reservation
, double _weight
, double _limit
) :
81 reservation(_reservation
),
84 reservation_inv(0.0 == reservation
? 0.0 : 1.0 / reservation
),
85 weight_inv( 0.0 == weight
? 0.0 : 1.0 / weight
),
86 limit_inv( 0.0 == limit
? 0.0 : 1.0 / limit
)
92 friend std::ostream
& operator<<(std::ostream
& out
,
93 const ClientInfo
& client
) {
95 "{ ClientInfo:: r:" << client
.reservation
<<
96 " w:" << std::fixed
<< client
.weight
<<
97 " l:" << std::fixed
<< client
.limit
<<
98 " 1/r:" << std::fixed
<< client
.reservation_inv
<<
99 " 1/w:" << std::fixed
<< client
.weight_inv
<<
100 " 1/l:" << std::fixed
<< client
.limit_inv
<<
104 }; // class ClientInfo
111 bool ready
; // true when within limit
112 #ifndef DO_NOT_DELAY_TAG_CALC
116 RequestTag(const RequestTag
& prev_tag
,
117 const ClientInfo
& client
,
118 const uint32_t delta
,
121 const double cost
= 0.0) :
122 reservation(cost
+ tag_calc(time
,
123 prev_tag
.reservation
,
124 client
.reservation_inv
,
127 proportion(tag_calc(time
,
138 #ifndef DO_NOT_DELAY_TAG_CALC
142 assert(reservation
< max_tag
|| proportion
< max_tag
);
145 RequestTag(const RequestTag
& prev_tag
,
146 const ClientInfo
& client
,
147 const ReqParams req_params
,
149 const double cost
= 0.0) :
150 RequestTag(prev_tag
, client
, req_params
.delta
, req_params
.rho
, time
, cost
)
153 RequestTag(double _res
, double _prop
, double _lim
, const Time _arrival
) :
158 #ifndef DO_NOT_DELAY_TAG_CALC
162 assert(reservation
< max_tag
|| proportion
< max_tag
);
165 RequestTag(const RequestTag
& other
) :
166 reservation(other
.reservation
),
167 proportion(other
.proportion
),
170 #ifndef DO_NOT_DELAY_TAG_CALC
171 , arrival(other
.arrival
)
177 static std::string
format_tag_change(double before
, double after
) {
178 if (before
== after
) {
179 return std::string("same");
181 std::stringstream ss
;
182 ss
<< format_tag(before
) << "=>" << format_tag(after
);
187 static std::string
format_tag(double value
) {
188 if (max_tag
== value
) {
189 return std::string("max");
190 } else if (min_tag
== value
) {
191 return std::string("min");
193 return format_time(value
, tag_modulo
);
199 static double tag_calc(const Time time
,
202 uint32_t dist_req_val
,
203 bool extreme_is_high
) {
204 if (0.0 == increment
) {
205 return extreme_is_high
? max_tag
: min_tag
;
207 if (0 != dist_req_val
) {
208 increment
*= dist_req_val
;
210 return std::max(time
, prev
+ increment
);
214 friend std::ostream
& operator<<(std::ostream
& out
,
215 const RequestTag
& tag
) {
217 "{ RequestTag:: ready:" << (tag
.ready
? "true" : "false") <<
218 " r:" << format_tag(tag
.reservation
) <<
219 " p:" << format_tag(tag
.proportion
) <<
220 " l:" << format_tag(tag
.limit
) <<
221 #if 0 // try to resolve this to make sure Time is operator<<'able.
222 #ifndef DO_NOT_DELAY_TAG_CALC
223 " arrival:" << tag
.arrival
<<
229 }; // class RequestTag
232 // C is client identifier type, R is request type, B is heap
234 template<typename C
, typename R
, uint B
>
235 class PriorityQueueBase
{
236 // we don't want to include gtest.h just for FRIEND_TEST
237 friend class dmclock_server_client_idle_erase_Test
;
241 using RequestRef
= std::unique_ptr
<R
>;
245 using TimePoint
= decltype(std::chrono::steady_clock::now());
246 using Duration
= std::chrono::milliseconds
;
247 using MarkPoint
= std::pair
<TimePoint
,Counter
>;
249 enum class ReadyOption
{ignore
, lowers
, raises
};
251 // forward decl for friend decls
252 template<double RequestTag::*, ReadyOption
, bool>
253 struct ClientCompare
;
256 friend PriorityQueueBase
;
264 ClientReq(const RequestTag
& _tag
,
266 RequestRef
&& _request
) :
268 client_id(_client_id
),
269 request(std::move(_request
))
274 friend std::ostream
& operator<<(std::ostream
& out
, const ClientReq
& c
) {
275 out
<< "{ ClientReq:: tag:" << c
.tag
<< " client:" <<
279 }; // class ClientReq
283 // NOTE: ClientRec is in the "public" section for compatibility
284 // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1
285 // ClientRec could be "protected" with no issue. [See comments
286 // associated with function submit_top_request.]
288 friend PriorityQueueBase
<C
,R
,B
>;
292 std::deque
<ClientReq
> requests
;
294 // amount added from the proportion tag as a result of
295 // an idle client becoming unidle
296 double prop_delta
= 0.0;
298 c::IndIntruHeapData reserv_heap_data
;
299 c::IndIntruHeapData lim_heap_data
;
300 c::IndIntruHeapData ready_heap_data
;
302 c::IndIntruHeapData prop_heap_data
;
314 const ClientInfo
& _info
,
315 Counter current_tick
) :
317 prev_tag(0.0, 0.0, 0.0, TimeZero
),
320 last_tick(current_tick
),
327 inline const RequestTag
& get_req_tag() const {
331 static inline void assign_unpinned_tag(double& lhs
, const double rhs
) {
332 if (rhs
!= max_tag
&& rhs
!= min_tag
) {
337 inline void update_req_tag(const RequestTag
& _prev
,
338 const Counter
& _tick
) {
339 assign_unpinned_tag(prev_tag
.reservation
, _prev
.reservation
);
340 assign_unpinned_tag(prev_tag
.limit
, _prev
.limit
);
341 assign_unpinned_tag(prev_tag
.proportion
, _prev
.proportion
);
345 inline void add_request(const RequestTag
& tag
,
347 RequestRef
&& request
) {
348 requests
.emplace_back(ClientReq(tag
, client_id
, std::move(request
)));
351 inline const ClientReq
& next_request() const {
352 return requests
.front();
355 inline ClientReq
& next_request() {
356 return requests
.front();
359 inline void pop_request() {
360 requests
.pop_front();
363 inline bool has_request() const {
364 return !requests
.empty();
367 inline size_t request_count() const {
368 return requests
.size();
371 // NB: because a deque is the underlying structure, this
372 // operation might be expensive
373 bool remove_by_req_filter_fw(std::function
<bool(R
&&)> filter_accum
) {
374 bool any_removed
= false;
375 for (auto i
= requests
.begin();
378 if (filter_accum(std::move(*i
->request
))) {
380 i
= requests
.erase(i
);
388 // NB: because a deque is the underlying structure, this
389 // operation might be expensive
390 bool remove_by_req_filter_bw(std::function
<bool(R
&&)> filter_accum
) {
391 bool any_removed
= false;
392 for (auto i
= requests
.rbegin();
393 i
!= requests
.rend();
395 if (filter_accum(std::move(*i
->request
))) {
397 i
= decltype(i
){ requests
.erase(std::next(i
).base()) };
406 remove_by_req_filter(std::function
<bool(R
&&)> filter_accum
,
407 bool visit_backwards
) {
408 if (visit_backwards
) {
409 return remove_by_req_filter_bw(filter_accum
);
411 return remove_by_req_filter_fw(filter_accum
);
416 operator<<(std::ostream
& out
,
417 const typename PriorityQueueBase
<C
,R
,B
>::ClientRec
& e
) {
418 out
<< "{ ClientRec::" <<
419 " client:" << e
.client
<<
420 " prev_tag:" << e
.prev_tag
<<
421 " req_count:" << e
.requests
.size() <<
423 if (e
.has_request()) {
424 out
<< e
.next_request();
432 }; // class ClientRec
434 using ClientRecRef
= std::shared_ptr
<ClientRec
>;
436 // when we try to get the next request, we'll be in one of three
437 // situations -- we'll have one to return, have one that can
438 // fire in the future, or not have any
439 enum class NextReqType
{ returning
, future
, none
};
441 // specifies which queue next request will get popped from
442 enum class HeapId
{ reservation
, ready
};
444 // this is returned from next_req to tell the caller the situation
454 // a function that can be called to look up client information
455 using ClientInfoFunc
= std::function
<ClientInfo(const C
&)>;
459 DataGuard
g(data_mtx
);
460 return (resv_heap
.empty() || ! resv_heap
.top().has_request());
464 size_t client_count() const {
465 DataGuard
g(data_mtx
);
466 return resv_heap
.size();
470 size_t request_count() const {
471 DataGuard
g(data_mtx
);
473 for (auto i
= resv_heap
.cbegin(); i
!= resv_heap
.cend(); ++i
) {
474 total
+= i
->request_count();
480 bool remove_by_req_filter(std::function
<bool(R
&&)> filter_accum
,
481 bool visit_backwards
= false) {
482 bool any_removed
= false;
483 DataGuard
g(data_mtx
);
484 for (auto i
: client_map
) {
486 i
.second
->remove_by_req_filter(filter_accum
, visit_backwards
);
488 resv_heap
.adjust(*i
.second
);
489 limit_heap
.adjust(*i
.second
);
490 ready_heap
.adjust(*i
.second
);
492 prop_heap
.adjust(*i
.second
);
501 // use as a default value when no accumulator is provide
502 static void request_sink(R
&& req
) {
507 void remove_by_client(const C
& client
,
508 bool reverse
= false,
509 std::function
<void (R
&&)> accum
= request_sink
) {
510 DataGuard
g(data_mtx
);
512 auto i
= client_map
.find(client
);
514 if (i
== client_map
.end()) return;
517 for (auto j
= i
->second
->requests
.rbegin();
518 j
!= i
->second
->requests
.rend();
520 accum(std::move(*j
->request
));
523 for (auto j
= i
->second
->requests
.begin();
524 j
!= i
->second
->requests
.end();
526 accum(std::move(*j
->request
));
530 i
->second
->requests
.clear();
532 resv_heap
.adjust(*i
->second
);
533 limit_heap
.adjust(*i
->second
);
534 ready_heap
.adjust(*i
->second
);
536 prop_heap
.adjust(*i
->second
);
541 uint
get_heap_branching_factor() const {
546 friend std::ostream
& operator<<(std::ostream
& out
,
547 const PriorityQueueBase
& q
) {
548 std::lock_guard
<decltype(q
.data_mtx
)> guard(q
.data_mtx
);
550 out
<< "{ PriorityQueue::";
551 for (const auto& c
: q
.client_map
) {
552 out
<< " { client:" << c
.first
<< ", record:" << *c
.second
<<
555 if (!q
.resv_heap
.empty()) {
556 const auto& resv
= q
.resv_heap
.top();
557 out
<< " { reservation_top:" << resv
<< " }";
558 const auto& ready
= q
.ready_heap
.top();
559 out
<< " { ready_top:" << ready
<< " }";
560 const auto& limit
= q
.limit_heap
.top();
561 out
<< " { limit_top:" << limit
<< " }";
563 out
<< " HEAPS-EMPTY";
571 void display_queues(std::ostream
& out
,
572 bool show_res
= true,
573 bool show_lim
= true,
574 bool show_ready
= true,
575 bool show_prop
= true) const {
576 auto filter
= [](const ClientRec
& e
)->bool { return true; };
577 DataGuard
g(data_mtx
);
579 resv_heap
.display_sorted(out
<< "RESER:", filter
);
582 limit_heap
.display_sorted(out
<< "LIMIT:", filter
);
585 ready_heap
.display_sorted(out
<< "READY:", filter
);
589 prop_heap
.display_sorted(out
<< "PROPO:", filter
);
597 // The ClientCompare functor is essentially doing a precedes?
598 // operator, returning true if and only if the first parameter
599 // must precede the second parameter. If the second must precede
600 // the first, or if they are equivalent, false should be
601 // returned. The reason for this behavior is that it will be
602 // called to test if two items are out of order and if true is
603 // returned it will reverse the items. Therefore false is the
604 // default return when it doesn't matter to prevent unnecessary
607 // The template is supporting variations in sorting based on the
608 // heap in question and allowing these variations to be handled
611 // tag_field determines which tag is being used for comparison
613 // ready_opt determines how the ready flag influences the sort
615 // use_prop_delta determines whether the proportional delta is
616 // added in for comparison
617 template<double RequestTag::*tag_field
,
618 ReadyOption ready_opt
,
620 struct ClientCompare
{
621 bool operator()(const ClientRec
& n1
, const ClientRec
& n2
) const {
622 if (n1
.has_request()) {
623 if (n2
.has_request()) {
624 const auto& t1
= n1
.next_request().tag
;
625 const auto& t2
= n2
.next_request().tag
;
626 if (ReadyOption::ignore
== ready_opt
|| t1
.ready
== t2
.ready
) {
627 // if we don't care about ready or the ready values are the same
628 if (use_prop_delta
) {
629 return (t1
.*tag_field
+ n1
.prop_delta
) <
630 (t2
.*tag_field
+ n2
.prop_delta
);
632 return t1
.*tag_field
< t2
.*tag_field
;
634 } else if (ReadyOption::raises
== ready_opt
) {
635 // use_ready == true && the ready fields are different
641 // n1 has request but n2 does not
644 } else if (n2
.has_request()) {
645 // n2 has request but n1 does not
648 // both have none; keep stable w false
654 ClientInfoFunc client_info_f
;
656 mutable std::mutex data_mtx
;
657 using DataGuard
= std::lock_guard
<decltype(data_mtx
)>;
659 // stable mapping between client ids and client queues
660 std::map
<C
,ClientRecRef
> client_map
;
662 c::IndIntruHeap
<ClientRecRef
,
664 &ClientRec::reserv_heap_data
,
665 ClientCompare
<&RequestTag::reservation
,
670 c::IndIntruHeap
<ClientRecRef
,
672 &ClientRec::prop_heap_data
,
673 ClientCompare
<&RequestTag::proportion
,
678 c::IndIntruHeap
<ClientRecRef
,
680 &ClientRec::lim_heap_data
,
681 ClientCompare
<&RequestTag::limit
,
685 c::IndIntruHeap
<ClientRecRef
,
687 &ClientRec::ready_heap_data
,
688 ClientCompare
<&RequestTag::proportion
,
693 // if all reservations are met and all other requestes are under
694 // limit, this will allow the request next in terms of
695 // proportion to still get issued
696 bool allow_limit_break
;
698 std::atomic_bool finishing
;
700 // every request creates a tick
703 // performance data collection
704 size_t reserv_sched_count
= 0;
705 size_t prop_sched_count
= 0;
706 size_t limit_break_sched_count
= 0;
711 std::deque
<MarkPoint
> clean_mark_points
;
713 // NB: All threads declared at end, so they're destructed first!
715 std::unique_ptr
<RunEvery
> cleaning_job
;
718 // COMMON constructor that others feed into; we can accept three
719 // different variations of durations
720 template<typename Rep
, typename Per
>
721 PriorityQueueBase(ClientInfoFunc _client_info_f
,
722 std::chrono::duration
<Rep
,Per
> _idle_age
,
723 std::chrono::duration
<Rep
,Per
> _erase_age
,
724 std::chrono::duration
<Rep
,Per
> _check_time
,
725 bool _allow_limit_break
) :
726 client_info_f(_client_info_f
),
727 allow_limit_break(_allow_limit_break
),
729 idle_age(std::chrono::duration_cast
<Duration
>(_idle_age
)),
730 erase_age(std::chrono::duration_cast
<Duration
>(_erase_age
)),
731 check_time(std::chrono::duration_cast
<Duration
>(_check_time
))
733 assert(_erase_age
>= _idle_age
);
734 assert(_check_time
< _idle_age
);
736 std::unique_ptr
<RunEvery
>(
737 new RunEvery(check_time
,
738 std::bind(&PriorityQueueBase::do_clean
, this)));
742 ~PriorityQueueBase() {
747 // data_mtx must be held by caller
748 void do_add_request(RequestRef
&& request
,
750 const ReqParams
& req_params
,
752 const double cost
= 0.0) {
755 // this pointer will help us create a reference to a shared
756 // pointer, no matter which of two codepaths we take
757 ClientRec
* temp_client
;
759 auto client_it
= client_map
.find(client_id
);
760 if (client_map
.end() != client_it
) {
761 temp_client
= &(*client_it
->second
); // address of obj of shared_ptr
763 ClientInfo info
= client_info_f(client_id
);
764 ClientRecRef client_rec
=
765 std::make_shared
<ClientRec
>(client_id
, info
, tick
);
766 resv_heap
.push(client_rec
);
768 prop_heap
.push(client_rec
);
770 limit_heap
.push(client_rec
);
771 ready_heap
.push(client_rec
);
772 client_map
[client_id
] = client_rec
;
773 temp_client
= &(*client_rec
); // address of obj of shared_ptr
776 // for convenience, we'll create a reference to the shared pointer
777 ClientRec
& client
= *temp_client
;
780 // We need to do an adjustment so that idle clients compete
781 // fairly on proportional tags since those tags may have
782 // drifted from real-time. Either use the lowest existing
783 // proportion tag -- O(1) -- or the client with the lowest
784 // previous proportion tag -- O(n) where n = # clients.
786 // So we don't have to maintain a propotional queue that
787 // keeps the minimum on proportional tag alone (we're
788 // instead using a ready queue), we'll have to check each
791 // The alternative would be to maintain a proportional queue
792 // (define USE_PROP_TAG) and do an O(1) operation here.
794 // Was unable to confirm whether equality testing on
795 // std::numeric_limits<double>::max() is guaranteed, so
796 // we'll use a compile-time calculated trigger that is one
797 // third the max, which should be much larger than any
798 // expected organic value.
799 constexpr double lowest_prop_tag_trigger
=
800 std::numeric_limits
<double>::max() / 3.0;
802 double lowest_prop_tag
= std::numeric_limits
<double>::max();
803 for (auto const &c
: client_map
) {
804 // don't use ourselves (or anything else that might be
805 // listed as idle) since we're now in the map
806 if (!c
.second
->idle
) {
808 // use either lowest proportion tag or previous proportion tag
809 if (c
.second
->has_request()) {
810 p
= c
.second
->next_request().tag
.proportion
+
811 c
.second
->prop_delta
;
813 p
= c
.second
->get_req_tag().proportion
+ c
.second
->prop_delta
;
816 if (p
< lowest_prop_tag
) {
822 // if this conditional does not fire, it
823 if (lowest_prop_tag
< lowest_prop_tag_trigger
) {
824 client
.prop_delta
= lowest_prop_tag
- time
;
827 } // if this client was idle
829 #ifndef DO_NOT_DELAY_TAG_CALC
830 RequestTag
tag(0, 0, 0, time
);
832 if (!client
.has_request()) {
833 tag
= RequestTag(client
.get_req_tag(),
839 // copy tag to previous tag for client
840 client
.update_req_tag(tag
, tick
);
843 RequestTag
tag(client
.get_req_tag(), client
.info
, req_params
, time
, cost
);
844 // copy tag to previous tag for client
845 client
.update_req_tag(tag
, tick
);
848 client
.add_request(tag
, client
.client
, std::move(request
));
849 if (1 == client
.requests
.size()) {
850 // NB: can the following 4 calls to adjust be changed
851 // promote? Can adding a request ever demote a client in the
853 resv_heap
.adjust(client
);
854 limit_heap
.adjust(client
);
855 ready_heap
.adjust(client
);
857 prop_heap
.adjust(client
);
861 client
.cur_rho
= req_params
.rho
;
862 client
.cur_delta
= req_params
.delta
;
864 resv_heap
.adjust(client
);
865 limit_heap
.adjust(client
);
866 ready_heap
.adjust(client
);
868 prop_heap
.adjust(client
);
873 // data_mtx should be held when called; top of heap should have
875 template<typename C1
, IndIntruHeapData
ClientRec::*C2
, typename C3
>
876 void pop_process_request(IndIntruHeap
<C1
, ClientRec
, C2
, C3
, B
>& heap
,
877 std::function
<void(const C
& client
,
878 RequestRef
& request
)> process
) {
879 // gain access to data
880 ClientRec
& top
= heap
.top();
882 RequestRef request
= std::move(top
.next_request().request
);
883 #ifndef DO_NOT_DELAY_TAG_CALC
884 RequestTag tag
= top
.next_request().tag
;
887 // pop request and adjust heaps
890 #ifndef DO_NOT_DELAY_TAG_CALC
891 if (top
.has_request()) {
892 ClientReq
& next_first
= top
.next_request();
893 next_first
.tag
= RequestTag(tag
, top
.info
,
894 top
.cur_delta
, top
.cur_rho
,
895 next_first
.tag
.arrival
);
897 // copy tag to previous tag for client
898 top
.update_req_tag(next_first
.tag
, tick
);
902 resv_heap
.demote(top
);
903 limit_heap
.adjust(top
);
905 prop_heap
.demote(top
);
907 ready_heap
.demote(top
);
910 process(top
.client
, request
);
911 } // pop_process_request
914 // data_mtx should be held when called
915 void reduce_reservation_tags(ClientRec
& client
) {
916 for (auto& r
: client
.requests
) {
917 r
.tag
.reservation
-= client
.info
.reservation_inv
;
919 #ifndef DO_NOT_DELAY_TAG_CALC
920 // reduce only for front tag. because next tags' value are invalid
924 // don't forget to update previous tag
925 client
.prev_tag
.reservation
-= client
.info
.reservation_inv
;
926 resv_heap
.promote(client
);
930 // data_mtx should be held when called
931 void reduce_reservation_tags(const C
& client_id
) {
932 auto client_it
= client_map
.find(client_id
);
934 // means the client was cleaned from map; should never happen
935 // as long as cleaning times are long enough
936 assert(client_map
.end() != client_it
);
937 reduce_reservation_tags(*client_it
->second
);
941 // data_mtx should be held when called
942 NextReq
do_next_request(Time now
) {
945 // if reservation queue is empty, all are empty (i.e., no active clients)
946 if(resv_heap
.empty()) {
947 result
.type
= NextReqType::none
;
951 // try constraint (reservation) based scheduling
953 auto& reserv
= resv_heap
.top();
954 if (reserv
.has_request() &&
955 reserv
.next_request().tag
.reservation
<= now
) {
956 result
.type
= NextReqType::returning
;
957 result
.heap_id
= HeapId::reservation
;
961 // no existing reservations before now, so try weight-based
964 // all items that are within limit are eligible based on
966 auto limits
= &limit_heap
.top();
967 while (limits
->has_request() &&
968 !limits
->next_request().tag
.ready
&&
969 limits
->next_request().tag
.limit
<= now
) {
970 limits
->next_request().tag
.ready
= true;
971 ready_heap
.promote(*limits
);
972 limit_heap
.demote(*limits
);
974 limits
= &limit_heap
.top();
977 auto& readys
= ready_heap
.top();
978 if (readys
.has_request() &&
979 readys
.next_request().tag
.ready
&&
980 readys
.next_request().tag
.proportion
< max_tag
) {
981 result
.type
= NextReqType::returning
;
982 result
.heap_id
= HeapId::ready
;
986 // if nothing is schedulable by reservation or
987 // proportion/weight, and if we allow limit break, try to
988 // schedule something with the lowest proportion tag or
989 // alternatively lowest reservation tag.
990 if (allow_limit_break
) {
991 if (readys
.has_request() &&
992 readys
.next_request().tag
.proportion
< max_tag
) {
993 result
.type
= NextReqType::returning
;
994 result
.heap_id
= HeapId::ready
;
996 } else if (reserv
.has_request() &&
997 reserv
.next_request().tag
.reservation
< max_tag
) {
998 result
.type
= NextReqType::returning
;
999 result
.heap_id
= HeapId::reservation
;
1004 // nothing scheduled; make sure we re-run when next
1005 // reservation item or next limited item comes up
1007 Time next_call
= TimeMax
;
1008 if (resv_heap
.top().has_request()) {
1010 min_not_0_time(next_call
,
1011 resv_heap
.top().next_request().tag
.reservation
);
1013 if (limit_heap
.top().has_request()) {
1014 const auto& next
= limit_heap
.top().next_request();
1015 assert(!next
.tag
.ready
|| max_tag
== next
.tag
.proportion
);
1016 next_call
= min_not_0_time(next_call
, next
.tag
.limit
);
1018 if (next_call
< TimeMax
) {
1019 result
.type
= NextReqType::future
;
1020 result
.when_ready
= next_call
;
1023 result
.type
= NextReqType::none
;
1026 } // do_next_request
1029 // if possible is not zero and less than current then return it;
1030 // otherwise return current; the idea is we're trying to find
1031 // the minimal time but ignoring zero
1032 static inline const Time
& min_not_0_time(const Time
& current
,
1033 const Time
& possible
) {
1034 return TimeZero
== possible
? current
: std::min(current
, possible
);
1039 * This is being called regularly by RunEvery. Every time it's
1040 * called it notes the time and delta counter (mark point) in a
1041 * deque. It also looks at the deque to find the most recent
1042 * mark point that is older than clean_age. It then walks the
1043 * map and delete all server entries that were last used before
1047 TimePoint now
= std::chrono::steady_clock::now();
1048 DataGuard
g(data_mtx
);
1049 clean_mark_points
.emplace_back(MarkPoint(now
, tick
));
1051 // first erase the super-old client records
1053 Counter erase_point
= 0;
1054 auto point
= clean_mark_points
.front();
1055 while (point
.first
<= now
- erase_age
) {
1056 erase_point
= point
.second
;
1057 clean_mark_points
.pop_front();
1058 point
= clean_mark_points
.front();
1061 Counter idle_point
= 0;
1062 for (auto i
: clean_mark_points
) {
1063 if (i
.first
<= now
- idle_age
) {
1064 idle_point
= i
.second
;
1070 if (erase_point
> 0 || idle_point
> 0) {
1071 for (auto i
= client_map
.begin(); i
!= client_map
.end(); /* empty */) {
1073 if (erase_point
&& i2
->second
->last_tick
<= erase_point
) {
1074 delete_from_heaps(i2
->second
);
1075 client_map
.erase(i2
);
1076 } else if (idle_point
&& i2
->second
->last_tick
<= idle_point
) {
1077 i2
->second
->idle
= true;
1084 // data_mtx must be held by caller
1085 template<IndIntruHeapData
ClientRec::*C1
,typename C2
>
1086 void delete_from_heap(ClientRecRef
& client
,
1087 c::IndIntruHeap
<ClientRecRef
,ClientRec
,C1
,C2
,B
>& heap
) {
1088 auto i
= heap
.rfind(client
);
1093 // data_mtx must be held by caller
1094 void delete_from_heaps(ClientRecRef
& client
) {
1095 delete_from_heap(client
, resv_heap
);
1097 delete_from_heap(client
, prop_heap
);
1099 delete_from_heap(client
, limit_heap
);
1100 delete_from_heap(client
, ready_heap
);
1102 }; // class PriorityQueueBase
1105 template<typename C
, typename R
, uint B
=2>
1106 class PullPriorityQueue
: public PriorityQueueBase
<C
,R
,B
> {
1107 using super
= PriorityQueueBase
<C
,R
,B
>;
1111 // When a request is pulled, this is the return type.
1115 typename
super::RequestRef request
;
1119 typename
super::NextReqType type
;
1120 boost::variant
<Retn
,Time
> data
;
1122 bool is_none() const { return type
== super::NextReqType::none
; }
1124 bool is_retn() const { return type
== super::NextReqType::returning
; }
1126 return boost::get
<Retn
>(data
);
1129 bool is_future() const { return type
== super::NextReqType::future
; }
1130 Time
getTime() const { return boost::get
<Time
>(data
); }
1135 ProfileTimer
<std::chrono::nanoseconds
> pull_request_timer
;
1136 ProfileTimer
<std::chrono::nanoseconds
> add_request_timer
;
1139 template<typename Rep
, typename Per
>
1140 PullPriorityQueue(typename
super::ClientInfoFunc _client_info_f
,
1141 std::chrono::duration
<Rep
,Per
> _idle_age
,
1142 std::chrono::duration
<Rep
,Per
> _erase_age
,
1143 std::chrono::duration
<Rep
,Per
> _check_time
,
1144 bool _allow_limit_break
= false) :
1145 super(_client_info_f
,
1146 _idle_age
, _erase_age
, _check_time
,
1153 // pull convenience constructor
1154 PullPriorityQueue(typename
super::ClientInfoFunc _client_info_f
,
1155 bool _allow_limit_break
= false) :
1156 PullPriorityQueue(_client_info_f
,
1157 std::chrono::minutes(10),
1158 std::chrono::minutes(15),
1159 std::chrono::minutes(6),
1166 inline void add_request(R
&& request
,
1168 const ReqParams
& req_params
,
1169 double addl_cost
= 0.0) {
1170 add_request(typename
super::RequestRef(new R(std::move(request
))),
1178 inline void add_request(R
&& request
,
1180 double addl_cost
= 0.0) {
1181 static const ReqParams null_req_params
;
1182 add_request(typename
super::RequestRef(new R(std::move(request
))),
1191 inline void add_request_time(R
&& request
,
1193 const ReqParams
& req_params
,
1195 double addl_cost
= 0.0) {
1196 add_request(typename
super::RequestRef(new R(std::move(request
))),
1204 inline void add_request(typename
super::RequestRef
&& request
,
1206 const ReqParams
& req_params
,
1207 double addl_cost
= 0.0) {
1208 add_request(request
, req_params
, client_id
, get_time(), addl_cost
);
1212 inline void add_request(typename
super::RequestRef
&& request
,
1214 double addl_cost
= 0.0) {
1215 static const ReqParams null_req_params
;
1216 add_request(request
, null_req_params
, client_id
, get_time(), addl_cost
);
1220 // this does the work; the versions above provide alternate interfaces
1221 void add_request(typename
super::RequestRef
&& request
,
1223 const ReqParams
& req_params
,
1225 double addl_cost
= 0.0) {
1226 typename
super::DataGuard
g(this->data_mtx
);
1228 add_request_timer
.start();
1230 super::do_add_request(std::move(request
),
1235 // no call to schedule_request for pull version
1237 add_request_timer
.stop();
1242 inline PullReq
pull_request() {
1243 return pull_request(get_time());
1247 PullReq
pull_request(Time now
) {
1249 typename
super::DataGuard
g(this->data_mtx
);
1251 pull_request_timer
.start();
1254 typename
super::NextReq next
= super::do_next_request(now
);
1255 result
.type
= next
.type
;
1257 case super::NextReqType::none
:
1259 case super::NextReqType::future
:
1260 result
.data
= next
.when_ready
;
1262 case super::NextReqType::returning
:
1263 // to avoid nesting, break out and let code below handle this case
1269 // we'll only get here if we're returning an entry
1272 [&] (PullReq
& pull_result
, PhaseType phase
) ->
1273 std::function
<void(const C
&,
1274 typename
super::RequestRef
&)> {
1275 return [&pull_result
, phase
](const C
& client
,
1276 typename
super::RequestRef
& request
) {
1278 typename
PullReq::Retn
{client
, std::move(request
), phase
};
1282 switch(next
.heap_id
) {
1283 case super::HeapId::reservation
:
1284 super::pop_process_request(this->resv_heap
,
1285 process_f(result
, PhaseType::reservation
));
1286 ++this->reserv_sched_count
;
1288 case super::HeapId::ready
:
1289 super::pop_process_request(this->ready_heap
,
1290 process_f(result
, PhaseType::priority
));
1291 { // need to use retn temporarily
1292 auto& retn
= boost::get
<typename
PullReq::Retn
>(result
.data
);
1293 super::reduce_reservation_tags(retn
.client
);
1295 ++this->prop_sched_count
;
1302 pull_request_timer
.stop();
1311 // data_mtx should be held when called; unfortunately this
1312 // function has to be repeated in both push & pull
1314 typename
super::NextReq
next_request() {
1315 return next_request(get_time());
1317 }; // class PullPriorityQueue
1321 template<typename C
, typename R
, uint B
=2>
1322 class PushPriorityQueue
: public PriorityQueueBase
<C
,R
,B
> {
1326 using super
= PriorityQueueBase
<C
,R
,B
>;
1330 // a function to see whether the server can handle another request
1331 using CanHandleRequestFunc
= std::function
<bool(void)>;
1333 // a function to submit a request to the server; the second
1334 // parameter is a callback when it's completed
1335 using HandleRequestFunc
=
1336 std::function
<void(const C
&,typename
super::RequestRef
,PhaseType
)>;
1340 CanHandleRequestFunc can_handle_f
;
1341 HandleRequestFunc handle_f
;
1342 // for handling timed scheduling
1343 std::mutex sched_ahead_mtx
;
1344 std::condition_variable sched_ahead_cv
;
1345 Time sched_ahead_when
= TimeZero
;
1349 ProfileTimer
<std::chrono::nanoseconds
> add_request_timer
;
1350 ProfileTimer
<std::chrono::nanoseconds
> request_complete_timer
;
1354 // NB: threads declared last, so constructed last and destructed first
1356 std::thread sched_ahead_thd
;
1360 // push full constructor
1361 template<typename Rep
, typename Per
>
1362 PushPriorityQueue(typename
super::ClientInfoFunc _client_info_f
,
1363 CanHandleRequestFunc _can_handle_f
,
1364 HandleRequestFunc _handle_f
,
1365 std::chrono::duration
<Rep
,Per
> _idle_age
,
1366 std::chrono::duration
<Rep
,Per
> _erase_age
,
1367 std::chrono::duration
<Rep
,Per
> _check_time
,
1368 bool _allow_limit_break
= false) :
1369 super(_client_info_f
,
1370 _idle_age
, _erase_age
, _check_time
,
1373 can_handle_f
= _can_handle_f
;
1374 handle_f
= _handle_f
;
1375 sched_ahead_thd
= std::thread(&PushPriorityQueue::run_sched_ahead
, this);
1379 // push convenience constructor
1380 PushPriorityQueue(typename
super::ClientInfoFunc _client_info_f
,
1381 CanHandleRequestFunc _can_handle_f
,
1382 HandleRequestFunc _handle_f
,
1383 bool _allow_limit_break
= false) :
1384 PushPriorityQueue(_client_info_f
,
1387 std::chrono::minutes(10),
1388 std::chrono::minutes(15),
1389 std::chrono::minutes(6),
1396 ~PushPriorityQueue() {
1397 this->finishing
= true;
1398 sched_ahead_cv
.notify_one();
1399 sched_ahead_thd
.join();
1404 inline void add_request(R
&& request
,
1406 const ReqParams
& req_params
,
1407 double addl_cost
= 0.0) {
1408 add_request(typename
super::RequestRef(new R(std::move(request
))),
1416 inline void add_request(typename
super::RequestRef
&& request
,
1418 const ReqParams
& req_params
,
1419 double addl_cost
= 0.0) {
1420 add_request(request
, req_params
, client_id
, get_time(), addl_cost
);
1424 inline void add_request_time(const R
& request
,
1426 const ReqParams
& req_params
,
1428 double addl_cost
= 0.0) {
1429 add_request(typename
super::RequestRef(new R(request
)),
1437 void add_request(typename
super::RequestRef
&& request
,
1439 const ReqParams
& req_params
,
1441 double addl_cost
= 0.0) {
1442 typename
super::DataGuard
g(this->data_mtx
);
1444 add_request_timer
.start();
1446 super::do_add_request(std::move(request
),
1453 add_request_timer
.stop();
1458 void request_completed() {
1459 typename
super::DataGuard
g(this->data_mtx
);
1461 request_complete_timer
.start();
1465 request_complete_timer
.stop();
1471 // data_mtx should be held when called; furthermore, the heap
1472 // should not be empty and the top element of the heap should
1473 // not be already handled
1475 // NOTE: the use of "super::ClientRec" in either the template
1476 // construct or as a parameter to submit_top_request generated
1477 // a compiler error in g++ 4.8.4, when ClientRec was
1478 // "protected" rather than "public". By g++ 6.3.1 this was not
1479 // an issue. But for backwards compatibility
1480 // PriorityQueueBase::ClientRec is public.
1481 template<typename C1
,
1482 IndIntruHeapData
super::ClientRec::*C2
,
1485 C
submit_top_request(IndIntruHeap
<C1
,typename
super::ClientRec
,C2
,C3
,B4
>& heap
,
1488 super::pop_process_request(heap
,
1489 [this, phase
, &client_result
]
1491 typename
super::RequestRef
& request
) {
1492 client_result
= client
;
1493 handle_f(client
, std::move(request
), phase
);
1495 return client_result
;
1499 // data_mtx should be held when called
1500 void submit_request(typename
super::HeapId heap_id
) {
1503 case super::HeapId::reservation
:
1504 // don't need to note client
1505 (void) submit_top_request(this->resv_heap
, PhaseType::reservation
);
1506 // unlike the other two cases, we do not reduce reservation
1508 ++this->reserv_sched_count
;
1510 case super::HeapId::ready
:
1511 client
= submit_top_request(this->ready_heap
, PhaseType::priority
);
1512 super::reduce_reservation_tags(client
);
1513 ++this->prop_sched_count
;
1521 // data_mtx should be held when called; unfortunately this
1522 // function has to be repeated in both push & pull
1524 typename
super::NextReq
next_request() {
1525 return next_request(get_time());
1529 // data_mtx should be held when called; overrides member
1530 // function in base class to add check for whether a request can
1531 // be pushed to the server
1532 typename
super::NextReq
next_request(Time now
) {
1533 if (!can_handle_f()) {
1534 typename
super::NextReq result
;
1535 result
.type
= super::NextReqType::none
;
1538 return super::do_next_request(now
);
1543 // data_mtx should be held when called
1544 void schedule_request() {
1545 typename
super::NextReq next_req
= next_request();
1546 switch (next_req
.type
) {
1547 case super::NextReqType::none
:
1549 case super::NextReqType::future
:
1550 sched_at(next_req
.when_ready
);
1552 case super::NextReqType::returning
:
1553 submit_request(next_req
.heap_id
);
1561 // this is the thread that handles running schedule_request at
1562 // future times when nothing can be scheduled immediately
1563 void run_sched_ahead() {
1564 std::unique_lock
<std::mutex
> l(sched_ahead_mtx
);
1566 while (!this->finishing
) {
1567 if (TimeZero
== sched_ahead_when
) {
1568 sched_ahead_cv
.wait(l
);
1571 while (!this->finishing
&& (now
= get_time()) < sched_ahead_when
) {
1572 long microseconds_l
= long(1 + 1000000 * (sched_ahead_when
- now
));
1573 auto microseconds
= std::chrono::microseconds(microseconds_l
);
1574 sched_ahead_cv
.wait_for(l
, microseconds
);
1576 sched_ahead_when
= TimeZero
;
1577 if (this->finishing
) return;
1580 if (!this->finishing
) {
1581 typename
super::DataGuard
g(this->data_mtx
);
1590 void sched_at(Time when
) {
1591 std::lock_guard
<std::mutex
> l(sched_ahead_mtx
);
1592 if (this->finishing
) return;
1593 if (TimeZero
== sched_ahead_when
|| when
< sched_ahead_when
) {
1594 sched_ahead_when
= when
;
1595 sched_ahead_cv
.notify_one();
1598 }; // class PushPriorityQueue
1600 } // namespace dmclock
1601 } // namespace crimson