1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 * Copyright (C) 2017 Red Hat Inc.
7 * Author: J. Eric Ivancich <ivancich@redhat.com>
9 * This is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU Lesser General Public License version
11 * 2.1, as published by the Free Software Foundation. See file
18 /* COMPILATION OPTIONS
20 * The prop_heap does not seem to be necessary. The only thing it
21 * would help with is quickly finding the minimum proportion/prioity
22 * when an idle client became active. To have the code maintain the
23 * proportional heap, define USE_PROP_HEAP (i.e., compiler argument
36 #include <condition_variable>
42 #include <boost/variant.hpp>
44 #include "indirect_intrusive_heap.h"
45 #include "../support/src/run_every.h"
46 #include "dmclock_util.h"
47 #include "dmclock_recs.h"
58 namespace c
= crimson
;
60 constexpr double max_tag
= std::numeric_limits
<double>::is_iec559
?
61 std::numeric_limits
<double>::infinity() :
62 std::numeric_limits
<double>::max();
63 constexpr double min_tag
= std::numeric_limits
<double>::is_iec559
?
64 -std::numeric_limits
<double>::infinity() :
65 std::numeric_limits
<double>::lowest();
66 constexpr unsigned tag_modulo
= 1000000;
68 constexpr auto standard_idle_age
= std::chrono::seconds(300);
69 constexpr auto standard_erase_age
= std::chrono::seconds(600);
70 constexpr auto standard_check_time
= std::chrono::seconds(60);
71 constexpr auto aggressive_check_time
= std::chrono::seconds(5);
72 constexpr unsigned standard_erase_max
= 2000;
75 // requests are delayed until the limit is restored
77 // requests are allowed to exceed their limit, if all other reservations
78 // are met and below their limits
80 // if an incoming request would exceed its limit, add_request() will
81 // reject it with EAGAIN instead of adding it to the queue. cannot be used
82 // with DelayedTagCalc, because add_request() needs an accurate tag
86 // when AtLimit::Reject is used, only start rejecting requests once their
87 // limit is above this threshold. requests under this threshold are
88 // enqueued and processed like AtLimit::Wait
89 using RejectThreshold
= Time
;
91 // the AtLimit constructor parameter can either accept AtLimit or a value
92 // for RejectThreshold (which implies AtLimit::Reject)
93 using AtLimitParam
= boost::variant
<AtLimit
, RejectThreshold
>;
96 double reservation
; // minimum
97 double weight
; // proportional
98 double limit
; // maximum
100 // multiplicative inverses of above, which we use in calculations
101 // and don't want to recalculate repeatedly
102 double reservation_inv
;
106 // order parameters -- min, "normal", max
107 ClientInfo(double _reservation
, double _weight
, double _limit
) {
108 update(_reservation
, _weight
, _limit
);
111 inline void update(double _reservation
, double _weight
, double _limit
) {
112 reservation
= _reservation
;
115 reservation_inv
= (0.0 == reservation
) ? 0.0 : 1.0 / reservation
;
116 weight_inv
= (0.0 == weight
) ? 0.0 : 1.0 / weight
;
117 limit_inv
= (0.0 == limit
) ? 0.0 : 1.0 / limit
;
120 friend std::ostream
& operator<<(std::ostream
& out
,
121 const ClientInfo
& client
) {
123 "{ ClientInfo:: r:" << client
.reservation
<<
124 " w:" << std::fixed
<< client
.weight
<<
125 " l:" << std::fixed
<< client
.limit
<<
126 " 1/r:" << std::fixed
<< client
.reservation_inv
<<
127 " 1/w:" << std::fixed
<< client
.weight_inv
<<
128 " 1/l:" << std::fixed
<< client
.limit_inv
<<
132 }; // class ClientInfo
142 bool ready
; // true when within limit
145 RequestTag(const RequestTag
& prev_tag
,
146 const ClientInfo
& client
,
147 const uint32_t _delta
,
150 const Cost _cost
= 1u,
151 const double anticipation_timeout
= 0.0) :
159 Time max_time
= time
;
160 if (time
- anticipation_timeout
< prev_tag
.arrival
)
161 max_time
-= anticipation_timeout
;
163 reservation
= tag_calc(max_time
,
164 prev_tag
.reservation
,
165 client
.reservation_inv
,
169 proportion
= tag_calc(max_time
,
175 limit
= tag_calc(max_time
,
182 assert(reservation
< max_tag
|| proportion
< max_tag
);
185 RequestTag(const RequestTag
& prev_tag
,
186 const ClientInfo
& client
,
187 const ReqParams req_params
,
189 const Cost cost
= 1u,
190 const double anticipation_timeout
= 0.0) :
191 RequestTag(prev_tag
, client
, req_params
.delta
, req_params
.rho
, time
,
192 cost
, anticipation_timeout
)
195 RequestTag(const double _res
, const double _prop
, const double _lim
,
197 const uint32_t _delta
= 0,
198 const uint32_t _rho
= 0,
199 const Cost _cost
= 1u) :
210 assert(reservation
< max_tag
|| proportion
< max_tag
);
213 RequestTag(const RequestTag
& other
) :
214 reservation(other
.reservation
),
215 proportion(other
.proportion
),
221 arrival(other
.arrival
)
224 static std::string
format_tag_change(double before
, double after
) {
225 if (before
== after
) {
226 return std::string("same");
228 std::stringstream ss
;
229 ss
<< format_tag(before
) << "=>" << format_tag(after
);
234 static std::string
format_tag(double value
) {
235 if (max_tag
== value
) {
236 return std::string("max");
237 } else if (min_tag
== value
) {
238 return std::string("min");
240 return format_time(value
, tag_modulo
);
246 static double tag_calc(const Time time
,
248 const double increment
,
249 const uint32_t dist_req_val
,
250 const bool extreme_is_high
,
252 if (0.0 == increment
) {
253 return extreme_is_high
? max_tag
: min_tag
;
255 // insure 64-bit arithmetic before conversion to double
256 double tag_increment
= increment
* (uint64_t(dist_req_val
) + cost
);
257 return std::max(time
, prev
+ tag_increment
);
261 friend std::ostream
& operator<<(std::ostream
& out
,
262 const RequestTag
& tag
) {
264 "{ RequestTag:: ready:" << (tag
.ready
? "true" : "false") <<
265 " r:" << format_tag(tag
.reservation
) <<
266 " p:" << format_tag(tag
.proportion
) <<
267 " l:" << format_tag(tag
.limit
) <<
268 #if 0 // try to resolve this to make sure Time is operator<<'able.
269 " arrival:" << tag
.arrival
<<
274 }; // class RequestTag
276 // C is client identifier type, R is request type,
277 // IsDelayed controls whether tag calculation is delayed until the request
278 // reaches the front of its queue. This is an optimization over the
279 // originally published dmclock algorithm, allowing it to use the most
280 // recent values of rho and delta.
281 // U1 determines whether to use client information function dynamically,
282 // B is heap branching factor
283 template<typename C
, typename R
, bool IsDelayed
, bool U1
, unsigned B
>
284 class PriorityQueueBase
{
285 // we don't want to include gtest.h just for FRIEND_TEST
286 friend class dmclock_server_client_idle_erase_Test
;
288 // types used for tag dispatch to select between implementations
289 using TagCalc
= std::integral_constant
<bool, IsDelayed
>;
290 using DelayedTagCalc
= std::true_type
;
291 using ImmediateTagCalc
= std::false_type
;
295 using RequestRef
= std::unique_ptr
<R
>;
299 using Clock
= std::chrono::steady_clock
;
300 using TimePoint
= Clock::time_point
;
301 using Duration
= std::chrono::milliseconds
;
302 using MarkPoint
= std::pair
<TimePoint
,Counter
>;
304 enum class ReadyOption
{ignore
, lowers
, raises
};
306 // forward decl for friend decls
307 template<double RequestTag::*, ReadyOption
, bool>
308 struct ClientCompare
;
311 friend PriorityQueueBase
;
319 ClientReq(const RequestTag
& _tag
,
321 RequestRef
&& _request
) :
323 client_id(_client_id
),
324 request(std::move(_request
))
329 friend std::ostream
& operator<<(std::ostream
& out
, const ClientReq
& c
) {
330 out
<< "{ ClientReq:: tag:" << c
.tag
<< " client:" <<
334 }; // class ClientReq
340 RequestMeta(const C
& _client_id
, const RequestTag
& _tag
) :
341 client_id(_client_id
),
350 // NOTE: ClientRec is in the "public" section for compatibility
351 // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1
352 // ClientRec could be "protected" with no issue. [See comments
353 // associated with function submit_top_request.]
355 friend PriorityQueueBase
<C
,R
,IsDelayed
,U1
,B
>;
359 std::deque
<ClientReq
> requests
;
361 // amount added from the proportion tag as a result of
362 // an idle client becoming unidle
363 double prop_delta
= 0.0;
365 c::IndIntruHeapData reserv_heap_data
{};
366 c::IndIntruHeapData lim_heap_data
{};
367 c::IndIntruHeapData ready_heap_data
{};
369 c::IndIntruHeapData prop_heap_data
{};
374 const ClientInfo
* info
;
381 const ClientInfo
* _info
,
382 Counter current_tick
) :
384 prev_tag(0.0, 0.0, 0.0, TimeZero
),
387 last_tick(current_tick
),
394 inline const RequestTag
& get_req_tag() const {
398 static inline void assign_unpinned_tag(double& lhs
, const double rhs
) {
399 if (rhs
!= max_tag
&& rhs
!= min_tag
) {
404 inline void update_req_tag(const RequestTag
& _prev
,
405 const Counter
& _tick
) {
406 assign_unpinned_tag(prev_tag
.reservation
, _prev
.reservation
);
407 assign_unpinned_tag(prev_tag
.limit
, _prev
.limit
);
408 assign_unpinned_tag(prev_tag
.proportion
, _prev
.proportion
);
409 prev_tag
.arrival
= _prev
.arrival
;
413 inline void add_request(const RequestTag
& tag
, RequestRef
&& request
) {
414 requests
.emplace_back(tag
, client
, std::move(request
));
417 inline const ClientReq
& next_request() const {
418 return requests
.front();
421 inline ClientReq
& next_request() {
422 return requests
.front();
425 inline void pop_request() {
426 requests
.pop_front();
429 inline bool has_request() const {
430 return !requests
.empty();
433 inline size_t request_count() const {
434 return requests
.size();
437 // NB: because a deque is the underlying structure, this
438 // operation might be expensive
439 bool remove_by_req_filter_fw(std::function
<bool(RequestRef
&&)> filter_accum
) {
440 bool any_removed
= false;
441 for (auto i
= requests
.begin();
444 if (filter_accum(std::move(i
->request
))) {
446 i
= requests
.erase(i
);
454 // NB: because a deque is the underlying structure, this
455 // operation might be expensive
456 bool remove_by_req_filter_bw(std::function
<bool(RequestRef
&&)> filter_accum
) {
457 bool any_removed
= false;
458 for (auto i
= requests
.rbegin();
459 i
!= requests
.rend();
461 if (filter_accum(std::move(i
->request
))) {
463 i
= decltype(i
){ requests
.erase(std::next(i
).base()) };
472 remove_by_req_filter(std::function
<bool(RequestRef
&&)> filter_accum
,
473 bool visit_backwards
) {
474 if (visit_backwards
) {
475 return remove_by_req_filter_bw(filter_accum
);
477 return remove_by_req_filter_fw(filter_accum
);
482 operator<<(std::ostream
& out
,
483 const typename
PriorityQueueBase::ClientRec
& e
) {
484 out
<< "{ ClientRec::" <<
485 " client:" << e
.client
<<
486 " prev_tag:" << e
.prev_tag
<<
487 " req_count:" << e
.requests
.size() <<
489 if (e
.has_request()) {
490 out
<< e
.next_request();
498 }; // class ClientRec
500 using ClientRecRef
= std::shared_ptr
<ClientRec
>;
502 // when we try to get the next request, we'll be in one of three
503 // situations -- we'll have one to return, have one that can
504 // fire in the future, or not have any
505 enum class NextReqType
{ returning
, future
, none
};
507 // specifies which queue next request will get popped from
508 enum class HeapId
{ reservation
, ready
};
510 // this is returned from next_req to tell the caller the situation
518 inline explicit NextReq() :
519 type(NextReqType::none
)
522 inline NextReq(HeapId _heap_id
) :
523 type(NextReqType::returning
),
527 inline NextReq(Time _when_ready
) :
528 type(NextReqType::future
),
529 when_ready(_when_ready
)
532 // calls to this are clearer than calls to the default
534 static inline NextReq
none() {
540 // a function that can be called to look up client information
541 using ClientInfoFunc
= std::function
<const ClientInfo
*(const C
&)>;
545 DataGuard
g(data_mtx
);
546 return (resv_heap
.empty() || ! resv_heap
.top().has_request());
550 size_t client_count() const {
551 DataGuard
g(data_mtx
);
552 return resv_heap
.size();
556 size_t request_count() const {
557 DataGuard
g(data_mtx
);
559 for (auto i
= resv_heap
.cbegin(); i
!= resv_heap
.cend(); ++i
) {
560 total
+= i
->request_count();
566 bool remove_by_req_filter(std::function
<bool(RequestRef
&&)> filter_accum
,
567 bool visit_backwards
= false) {
568 bool any_removed
= false;
569 DataGuard
g(data_mtx
);
570 for (auto i
: client_map
) {
572 i
.second
->remove_by_req_filter(filter_accum
, visit_backwards
);
574 resv_heap
.adjust(*i
.second
);
575 limit_heap
.adjust(*i
.second
);
576 ready_heap
.adjust(*i
.second
);
578 prop_heap
.adjust(*i
.second
);
587 // use as a default value when no accumulator is provide
588 static void request_sink(RequestRef
&& req
) {
593 void remove_by_client(const C
& client
,
594 bool reverse
= false,
595 std::function
<void (RequestRef
&&)> accum
= request_sink
) {
596 DataGuard
g(data_mtx
);
598 auto i
= client_map
.find(client
);
600 if (i
== client_map
.end()) return;
603 for (auto j
= i
->second
->requests
.rbegin();
604 j
!= i
->second
->requests
.rend();
606 accum(std::move(j
->request
));
609 for (auto j
= i
->second
->requests
.begin();
610 j
!= i
->second
->requests
.end();
612 accum(std::move(j
->request
));
616 i
->second
->requests
.clear();
618 resv_heap
.adjust(*i
->second
);
619 limit_heap
.adjust(*i
->second
);
620 ready_heap
.adjust(*i
->second
);
622 prop_heap
.adjust(*i
->second
);
627 unsigned get_heap_branching_factor() const {
632 void update_client_info(const C
& client_id
) {
633 DataGuard
g(data_mtx
);
634 auto client_it
= client_map
.find(client_id
);
635 if (client_map
.end() != client_it
) {
636 ClientRec
& client
= (*client_it
->second
);
637 client
.info
= client_info_f(client_id
);
642 void update_client_infos() {
643 DataGuard
g(data_mtx
);
644 for (auto i
: client_map
) {
645 i
.second
->info
= client_info_f(i
.second
->client
);
650 friend std::ostream
& operator<<(std::ostream
& out
,
651 const PriorityQueueBase
& q
) {
652 std::lock_guard
<decltype(q
.data_mtx
)> guard(q
.data_mtx
);
654 out
<< "{ PriorityQueue::";
655 for (const auto& c
: q
.client_map
) {
656 out
<< " { client:" << c
.first
<< ", record:" << *c
.second
<<
659 if (!q
.resv_heap
.empty()) {
660 const auto& resv
= q
.resv_heap
.top();
661 out
<< " { reservation_top:" << resv
<< " }";
662 const auto& ready
= q
.ready_heap
.top();
663 out
<< " { ready_top:" << ready
<< " }";
664 const auto& limit
= q
.limit_heap
.top();
665 out
<< " { limit_top:" << limit
<< " }";
667 out
<< " HEAPS-EMPTY";
675 void display_queues(std::ostream
& out
,
676 bool show_res
= true,
677 bool show_lim
= true,
678 bool show_ready
= true,
679 bool show_prop
= true) const {
680 auto filter
= [](const ClientRec
& e
)->bool { return true; };
681 DataGuard
g(data_mtx
);
683 resv_heap
.display_sorted(out
<< "RESER:", filter
);
686 limit_heap
.display_sorted(out
<< "LIMIT:", filter
);
689 ready_heap
.display_sorted(out
<< "READY:", filter
);
693 prop_heap
.display_sorted(out
<< "PROPO:", filter
);
701 // The ClientCompare functor is essentially doing a precedes?
702 // operator, returning true if and only if the first parameter
703 // must precede the second parameter. If the second must precede
704 // the first, or if they are equivalent, false should be
705 // returned. The reason for this behavior is that it will be
706 // called to test if two items are out of order and if true is
707 // returned it will reverse the items. Therefore false is the
708 // default return when it doesn't matter to prevent unnecessary
711 // The template is supporting variations in sorting based on the
712 // heap in question and allowing these variations to be handled
715 // tag_field determines which tag is being used for comparison
717 // ready_opt determines how the ready flag influences the sort
719 // use_prop_delta determines whether the proportional delta is
720 // added in for comparison
721 template<double RequestTag::*tag_field
,
722 ReadyOption ready_opt
,
724 struct ClientCompare
{
725 bool operator()(const ClientRec
& n1
, const ClientRec
& n2
) const {
726 if (n1
.has_request()) {
727 if (n2
.has_request()) {
728 const auto& t1
= n1
.next_request().tag
;
729 const auto& t2
= n2
.next_request().tag
;
730 if (ReadyOption::ignore
== ready_opt
|| t1
.ready
== t2
.ready
) {
731 // if we don't care about ready or the ready values are the same
732 if (use_prop_delta
) {
733 return (t1
.*tag_field
+ n1
.prop_delta
) <
734 (t2
.*tag_field
+ n2
.prop_delta
);
736 return t1
.*tag_field
< t2
.*tag_field
;
738 } else if (ReadyOption::raises
== ready_opt
) {
739 // use_ready == true && the ready fields are different
745 // n1 has request but n2 does not
748 } else if (n2
.has_request()) {
749 // n2 has request but n1 does not
752 // both have none; keep stable w false
758 ClientInfoFunc client_info_f
;
759 static constexpr bool is_dynamic_cli_info_f
= U1
;
761 mutable std::mutex data_mtx
;
762 using DataGuard
= std::lock_guard
<decltype(data_mtx
)>;
764 // stable mapping between client ids and client queues
765 std::map
<C
,ClientRecRef
> client_map
;
767 c::IndIntruHeap
<ClientRecRef
,
769 &ClientRec::reserv_heap_data
,
770 ClientCompare
<&RequestTag::reservation
,
775 c::IndIntruHeap
<ClientRecRef
,
777 &ClientRec::prop_heap_data
,
778 ClientCompare
<&RequestTag::proportion
,
783 c::IndIntruHeap
<ClientRecRef
,
785 &ClientRec::lim_heap_data
,
786 ClientCompare
<&RequestTag::limit
,
790 c::IndIntruHeap
<ClientRecRef
,
792 &ClientRec::ready_heap_data
,
793 ClientCompare
<&RequestTag::proportion
,
799 RejectThreshold reject_threshold
= 0;
801 double anticipation_timeout
;
803 std::atomic_bool finishing
;
805 // every request creates a tick
808 // performance data collection
809 size_t reserv_sched_count
= 0;
810 size_t prop_sched_count
= 0;
811 size_t limit_break_sched_count
= 0;
816 std::deque
<MarkPoint
> clean_mark_points
;
817 // max number of clients to erase at a time
819 // unfinished last erase point
820 Counter last_erase_point
= 0;
822 // NB: All threads declared at end, so they're destructed first!
824 std::unique_ptr
<RunEvery
> cleaning_job
;
826 // helper function to return the value of a variant if it matches the
827 // given type T, or a default value of T otherwise
828 template <typename T
, typename Variant
>
829 static T
get_or_default(const Variant
& param
, T default_value
) {
830 const T
*p
= boost::get
<T
>(¶m
);
831 return p
? *p
: default_value
;
834 // COMMON constructor that others feed into; we can accept three
835 // different variations of durations
836 template<typename Rep
, typename Per
>
837 PriorityQueueBase(ClientInfoFunc _client_info_f
,
838 std::chrono::duration
<Rep
,Per
> _idle_age
,
839 std::chrono::duration
<Rep
,Per
> _erase_age
,
840 std::chrono::duration
<Rep
,Per
> _check_time
,
841 AtLimitParam at_limit_param
,
842 double _anticipation_timeout
) :
843 client_info_f(_client_info_f
),
844 at_limit(get_or_default(at_limit_param
, AtLimit::Reject
)),
845 reject_threshold(get_or_default(at_limit_param
, RejectThreshold
{0})),
846 anticipation_timeout(_anticipation_timeout
),
848 idle_age(std::chrono::duration_cast
<Duration
>(_idle_age
)),
849 erase_age(std::chrono::duration_cast
<Duration
>(_erase_age
)),
850 check_time(std::chrono::duration_cast
<Duration
>(_check_time
)),
851 erase_max(standard_erase_max
)
853 assert(_erase_age
>= _idle_age
);
854 assert(_check_time
< _idle_age
);
855 // AtLimit::Reject depends on ImmediateTagCalc
856 assert(at_limit
!= AtLimit::Reject
|| !IsDelayed
);
858 std::unique_ptr
<RunEvery
>(
859 new RunEvery(check_time
,
860 std::bind(&PriorityQueueBase::do_clean
, this)));
864 ~PriorityQueueBase() {
869 inline const ClientInfo
* get_cli_info(ClientRec
& client
) const {
870 if (is_dynamic_cli_info_f
) {
871 client
.info
= client_info_f(client
.client
);
876 // data_mtx must be held by caller
877 RequestTag
initial_tag(DelayedTagCalc delayed
, ClientRec
& client
,
878 const ReqParams
& params
, Time time
, Cost cost
) {
879 RequestTag
tag(0, 0, 0, time
, 0, 0, cost
);
881 // only calculate a tag if the request is going straight to the front
882 if (!client
.has_request()) {
883 const ClientInfo
* client_info
= get_cli_info(client
);
885 tag
= RequestTag(client
.get_req_tag(), *client_info
,
886 params
, time
, cost
, anticipation_timeout
);
888 // copy tag to previous tag for client
889 client
.update_req_tag(tag
, tick
);
894 // data_mtx must be held by caller
895 RequestTag
initial_tag(ImmediateTagCalc imm
, ClientRec
& client
,
896 const ReqParams
& params
, Time time
, Cost cost
) {
897 // calculate the tag unconditionally
898 const ClientInfo
* client_info
= get_cli_info(client
);
900 RequestTag
tag(client
.get_req_tag(), *client_info
,
901 params
, time
, cost
, anticipation_timeout
);
903 // copy tag to previous tag for client
904 client
.update_req_tag(tag
, tick
);
908 // data_mtx must be held by caller. returns 0 on success. when using
909 // AtLimit::Reject, requests that would exceed their limit are rejected
910 // with EAGAIN, and the queue will not take ownership of the given
911 // 'request' argument
912 int do_add_request(RequestRef
&& request
,
914 const ReqParams
& req_params
,
916 const Cost cost
= 1u) {
919 auto insert
= client_map
.emplace(client_id
, ClientRecRef
{});
922 const ClientInfo
* info
= client_info_f(client_id
);
923 auto client_rec
= std::make_shared
<ClientRec
>(client_id
, info
, tick
);
924 resv_heap
.push(client_rec
);
926 prop_heap
.push(client_rec
);
928 limit_heap
.push(client_rec
);
929 ready_heap
.push(client_rec
);
930 insert
.first
->second
= std::move(client_rec
);
933 // for convenience, we'll create a reference to the shared pointer
934 ClientRec
& client
= *insert
.first
->second
;
937 // We need to do an adjustment so that idle clients compete
938 // fairly on proportional tags since those tags may have
939 // drifted from real-time. Either use the lowest existing
940 // proportion tag -- O(1) -- or the client with the lowest
941 // previous proportion tag -- O(n) where n = # clients.
943 // So we don't have to maintain a proportional queue that
944 // keeps the minimum on proportional tag alone (we're
945 // instead using a ready queue), we'll have to check each
948 // The alternative would be to maintain a proportional queue
949 // (define USE_PROP_TAG) and do an O(1) operation here.
951 // Was unable to confirm whether equality testing on
952 // std::numeric_limits<double>::max() is guaranteed, so
953 // we'll use a compile-time calculated trigger that is one
954 // third the max, which should be much larger than any
955 // expected organic value.
956 constexpr double lowest_prop_tag_trigger
=
957 std::numeric_limits
<double>::max() / 3.0;
959 double lowest_prop_tag
= std::numeric_limits
<double>::max();
960 for (auto const &c
: client_map
) {
961 // don't use ourselves (or anything else that might be
962 // listed as idle) since we're now in the map
963 if (!c
.second
->idle
) {
965 // use either lowest proportion tag or previous proportion tag
966 if (c
.second
->has_request()) {
967 p
= c
.second
->next_request().tag
.proportion
+
968 c
.second
->prop_delta
;
970 p
= c
.second
->get_req_tag().proportion
+ c
.second
->prop_delta
;
973 if (p
< lowest_prop_tag
) {
979 // if this conditional does not fire, it
980 if (lowest_prop_tag
< lowest_prop_tag_trigger
) {
981 client
.prop_delta
= lowest_prop_tag
- time
;
984 } // if this client was idle
986 RequestTag tag
= initial_tag(TagCalc
{}, client
, req_params
, time
, cost
);
988 if (at_limit
== AtLimit::Reject
&&
989 tag
.limit
> time
+ reject_threshold
) {
990 // if the client is over its limit, reject it here
994 client
.add_request(tag
, std::move(request
));
995 if (1 == client
.requests
.size()) {
996 // NB: can the following 4 calls to adjust be changed
997 // promote? Can adding a request ever demote a client in the
999 resv_heap
.adjust(client
);
1000 limit_heap
.adjust(client
);
1001 ready_heap
.adjust(client
);
1003 prop_heap
.adjust(client
);
1007 client
.cur_rho
= req_params
.rho
;
1008 client
.cur_delta
= req_params
.delta
;
1010 resv_heap
.adjust(client
);
1011 limit_heap
.adjust(client
);
1012 ready_heap
.adjust(client
);
1014 prop_heap
.adjust(client
);
1019 // data_mtx must be held by caller
1020 void update_next_tag(DelayedTagCalc delayed
, ClientRec
& top
,
1021 const RequestTag
& tag
) {
1022 if (top
.has_request()) {
1023 // perform delayed tag calculation on the next request
1024 ClientReq
& next_first
= top
.next_request();
1025 const ClientInfo
* client_info
= get_cli_info(top
);
1026 assert(client_info
);
1027 next_first
.tag
= RequestTag(tag
, *client_info
,
1028 top
.cur_delta
, top
.cur_rho
,
1029 next_first
.tag
.arrival
,
1030 next_first
.tag
.cost
,
1031 anticipation_timeout
);
1032 // copy tag to previous tag for client
1033 top
.update_req_tag(next_first
.tag
, tick
);
1037 void update_next_tag(ImmediateTagCalc imm
, ClientRec
& top
,
1038 const RequestTag
& tag
) {
1039 // the next tag was already calculated on insertion
1042 // data_mtx should be held when called; top of heap should have
1044 template<typename C1
, IndIntruHeapData
ClientRec::*C2
, typename C3
>
1045 RequestTag
pop_process_request(IndIntruHeap
<C1
, ClientRec
, C2
, C3
, B
>& heap
,
1046 std::function
<void(const C
& client
,
1048 RequestRef
& request
)> process
) {
1049 // gain access to data
1050 ClientRec
& top
= heap
.top();
1052 Cost request_cost
= top
.next_request().tag
.cost
;
1053 RequestRef request
= std::move(top
.next_request().request
);
1054 RequestTag tag
= top
.next_request().tag
;
1056 // pop request and adjust heaps
1059 update_next_tag(TagCalc
{}, top
, tag
);
1061 resv_heap
.demote(top
);
1062 limit_heap
.adjust(top
);
1064 prop_heap
.demote(top
);
1066 ready_heap
.demote(top
);
1069 process(top
.client
, request_cost
, request
);
1072 } // pop_process_request
1075 // data_mtx must be held by caller
1076 void reduce_reservation_tags(DelayedTagCalc delayed
, ClientRec
& client
,
1077 const RequestTag
& tag
) {
1078 if (!client
.requests
.empty()) {
1079 // only maintain a tag for the first request
1080 auto& r
= client
.requests
.front();
1081 r
.tag
.reservation
-=
1082 client
.info
->reservation_inv
* std::max(uint32_t(1), tag
.rho
);
1086 // data_mtx should be held when called
1087 void reduce_reservation_tags(ImmediateTagCalc imm
, ClientRec
& client
,
1088 const RequestTag
& tag
) {
1090 client
.info
->reservation_inv
* std::max(uint32_t(1), tag
.rho
);
1091 for (auto& r
: client
.requests
) {
1092 r
.tag
.reservation
-= res_offset
;
1096 // data_mtx should be held when called
1097 void reduce_reservation_tags(const C
& client_id
, const RequestTag
& tag
) {
1098 auto client_it
= client_map
.find(client_id
);
1100 // means the client was cleaned from map; should never happen
1101 // as long as cleaning times are long enough
1102 assert(client_map
.end() != client_it
);
1103 ClientRec
& client
= *client_it
->second
;
1104 reduce_reservation_tags(TagCalc
{}, client
, tag
);
1106 // don't forget to update previous tag
1107 client
.prev_tag
.reservation
-=
1108 client
.info
->reservation_inv
* std::max(uint32_t(1), tag
.rho
);
1109 resv_heap
.promote(client
);
1113 // data_mtx should be held when called
1114 NextReq
do_next_request(Time now
) {
1115 // if reservation queue is empty, all are empty (i.e., no
1117 if(resv_heap
.empty()) {
1118 return NextReq::none();
1121 // try constraint (reservation) based scheduling
1123 auto& reserv
= resv_heap
.top();
1124 if (reserv
.has_request() &&
1125 reserv
.next_request().tag
.reservation
<= now
) {
1126 return NextReq(HeapId::reservation
);
1129 // no existing reservations before now, so try weight-based
1132 // all items that are within limit are eligible based on
1134 auto limits
= &limit_heap
.top();
1135 while (limits
->has_request() &&
1136 !limits
->next_request().tag
.ready
&&
1137 limits
->next_request().tag
.limit
<= now
) {
1138 limits
->next_request().tag
.ready
= true;
1139 ready_heap
.promote(*limits
);
1140 limit_heap
.demote(*limits
);
1142 limits
= &limit_heap
.top();
1145 auto& readys
= ready_heap
.top();
1146 if (readys
.has_request() &&
1147 readys
.next_request().tag
.ready
&&
1148 readys
.next_request().tag
.proportion
< max_tag
) {
1149 return NextReq(HeapId::ready
);
1152 // if nothing is schedulable by reservation or
1153 // proportion/weight, and if we allow limit break, try to
1154 // schedule something with the lowest proportion tag or
1155 // alternatively lowest reservation tag.
1156 if (at_limit
== AtLimit::Allow
) {
1157 if (readys
.has_request() &&
1158 readys
.next_request().tag
.proportion
< max_tag
) {
1159 return NextReq(HeapId::ready
);
1160 } else if (reserv
.has_request() &&
1161 reserv
.next_request().tag
.reservation
< max_tag
) {
1162 return NextReq(HeapId::reservation
);
1166 // nothing scheduled; make sure we re-run when next
1167 // reservation item or next limited item comes up
1169 Time next_call
= TimeMax
;
1170 if (resv_heap
.top().has_request()) {
1172 min_not_0_time(next_call
,
1173 resv_heap
.top().next_request().tag
.reservation
);
1175 if (limit_heap
.top().has_request()) {
1176 const auto& next
= limit_heap
.top().next_request();
1177 assert(!next
.tag
.ready
|| max_tag
== next
.tag
.proportion
);
1178 next_call
= min_not_0_time(next_call
, next
.tag
.limit
);
1180 if (next_call
< TimeMax
) {
1181 return NextReq(next_call
);
1183 return NextReq::none();
1185 } // do_next_request
1188 // if possible is not zero and less than current then return it;
1189 // otherwise return current; the idea is we're trying to find
1190 // the minimal time but ignoring zero
1191 static inline const Time
& min_not_0_time(const Time
& current
,
1192 const Time
& possible
) {
1193 return TimeZero
== possible
? current
: std::min(current
, possible
);
1198 * This is being called regularly by RunEvery. Every time it's
1199 * called it notes the time and delta counter (mark point) in a
1200 * deque. It also looks at the deque to find the most recent
1201 * mark point that is older than clean_age. It then walks the
1202 * map and delete all server entries that were last used before
1206 TimePoint now
= std::chrono::steady_clock::now();
1207 DataGuard
g(data_mtx
);
1208 clean_mark_points
.emplace_back(MarkPoint(now
, tick
));
1210 // first erase the super-old client records
1212 Counter erase_point
= last_erase_point
;
1213 auto point
= clean_mark_points
.front();
1214 while (point
.first
<= now
- erase_age
) {
1215 last_erase_point
= point
.second
;
1216 erase_point
= last_erase_point
;
1217 clean_mark_points
.pop_front();
1218 point
= clean_mark_points
.front();
1221 Counter idle_point
= 0;
1222 for (auto i
: clean_mark_points
) {
1223 if (i
.first
<= now
- idle_age
) {
1224 idle_point
= i
.second
;
1230 Counter erased_num
= 0;
1231 if (erase_point
> 0 || idle_point
> 0) {
1232 for (auto i
= client_map
.begin(); i
!= client_map
.end(); /* empty */) {
1235 erased_num
< erase_max
&&
1236 i2
->second
->last_tick
<= erase_point
) {
1237 delete_from_heaps(i2
->second
);
1238 client_map
.erase(i2
);
1240 } else if (idle_point
&& i2
->second
->last_tick
<= idle_point
) {
1241 i2
->second
->idle
= true;
1245 auto wperiod
= check_time
;
1246 if (erased_num
>= erase_max
) {
1247 wperiod
= duration_cast
<milliseconds
>(aggressive_check_time
);
1249 // clean finished, refresh
1250 last_erase_point
= 0;
1252 cleaning_job
->try_update(wperiod
);
1257 // data_mtx must be held by caller
1258 template<IndIntruHeapData
ClientRec::*C1
,typename C2
>
1259 void delete_from_heap(ClientRecRef
& client
,
1260 c::IndIntruHeap
<ClientRecRef
,ClientRec
,C1
,C2
,B
>& heap
) {
1261 auto i
= heap
.at(client
);
1266 // data_mtx must be held by caller
1267 void delete_from_heaps(ClientRecRef
& client
) {
1268 delete_from_heap(client
, resv_heap
);
1270 delete_from_heap(client
, prop_heap
);
1272 delete_from_heap(client
, limit_heap
);
1273 delete_from_heap(client
, ready_heap
);
1275 }; // class PriorityQueueBase
1278 template<typename C
, typename R
, bool IsDelayed
=false, bool U1
=false, unsigned B
=2>
1279 class PullPriorityQueue
: public PriorityQueueBase
<C
,R
,IsDelayed
,U1
,B
> {
1280 using super
= PriorityQueueBase
<C
,R
,IsDelayed
,U1
,B
>;
1284 // When a request is pulled, this is the return type.
1288 typename
super::RequestRef request
;
1293 typename
super::NextReqType type
;
1294 boost::variant
<Retn
,Time
> data
;
1296 bool is_none() const { return type
== super::NextReqType::none
; }
1298 bool is_retn() const { return type
== super::NextReqType::returning
; }
1300 return boost::get
<Retn
>(data
);
1303 bool is_future() const { return type
== super::NextReqType::future
; }
1304 Time
getTime() const { return boost::get
<Time
>(data
); }
1309 ProfileTimer
<std::chrono::nanoseconds
> pull_request_timer
;
1310 ProfileTimer
<std::chrono::nanoseconds
> add_request_timer
;
1313 template<typename Rep
, typename Per
>
1314 PullPriorityQueue(typename
super::ClientInfoFunc _client_info_f
,
1315 std::chrono::duration
<Rep
,Per
> _idle_age
,
1316 std::chrono::duration
<Rep
,Per
> _erase_age
,
1317 std::chrono::duration
<Rep
,Per
> _check_time
,
1318 AtLimitParam at_limit_param
= AtLimit::Wait
,
1319 double _anticipation_timeout
= 0.0) :
1320 super(_client_info_f
,
1321 _idle_age
, _erase_age
, _check_time
,
1322 at_limit_param
, _anticipation_timeout
)
1328 // pull convenience constructor
1329 PullPriorityQueue(typename
super::ClientInfoFunc _client_info_f
,
1330 AtLimitParam at_limit_param
= AtLimit::Wait
,
1331 double _anticipation_timeout
= 0.0) :
1332 PullPriorityQueue(_client_info_f
,
1335 standard_check_time
,
1337 _anticipation_timeout
)
1343 int add_request(R
&& request
,
1345 const ReqParams
& req_params
,
1346 const Cost cost
= 1u) {
1347 return add_request(typename
super::RequestRef(new R(std::move(request
))),
1355 int add_request(R
&& request
,
1357 const Cost cost
= 1u) {
1358 static const ReqParams null_req_params
;
1359 return add_request(typename
super::RequestRef(new R(std::move(request
))),
1367 int add_request_time(R
&& request
,
1369 const ReqParams
& req_params
,
1371 const Cost cost
= 1u) {
1372 return add_request(typename
super::RequestRef(new R(std::move(request
))),
1380 int add_request(typename
super::RequestRef
&& request
,
1382 const ReqParams
& req_params
,
1383 const Cost cost
= 1u) {
1384 return add_request(request
, req_params
, client_id
, get_time(), cost
);
1388 int add_request(typename
super::RequestRef
&& request
,
1390 const Cost cost
= 1u) {
1391 static const ReqParams null_req_params
;
1392 return add_request(request
, null_req_params
, client_id
, get_time(), cost
);
1396 // this does the work; the versions above provide alternate interfaces
1397 int add_request(typename
super::RequestRef
&& request
,
1399 const ReqParams
& req_params
,
1401 const Cost cost
= 1u) {
1402 typename
super::DataGuard
g(this->data_mtx
);
1404 add_request_timer
.start();
1406 int r
= super::do_add_request(std::move(request
),
1411 // no call to schedule_request for pull version
1413 add_request_timer
.stop();
1419 inline PullReq
pull_request() {
1420 return pull_request(get_time());
1424 PullReq
pull_request(const Time now
) {
1426 typename
super::DataGuard
g(this->data_mtx
);
1428 pull_request_timer
.start();
1431 typename
super::NextReq next
= super::do_next_request(now
);
1432 result
.type
= next
.type
;
1434 case super::NextReqType::none
:
1436 case super::NextReqType::future
:
1437 result
.data
= next
.when_ready
;
1439 case super::NextReqType::returning
:
1440 // to avoid nesting, break out and let code below handle this case
1446 // we'll only get here if we're returning an entry
1449 [&] (PullReq
& pull_result
, PhaseType phase
) ->
1450 std::function
<void(const C
&,
1452 typename
super::RequestRef
&)> {
1453 return [&pull_result
, phase
](const C
& client
,
1454 const Cost request_cost
,
1455 typename
super::RequestRef
& request
) {
1456 pull_result
.data
= typename
PullReq::Retn
{ client
,
1463 switch(next
.heap_id
) {
1464 case super::HeapId::reservation
:
1465 (void) super::pop_process_request(this->resv_heap
,
1467 PhaseType::reservation
));
1468 ++this->reserv_sched_count
;
1470 case super::HeapId::ready
:
1472 auto tag
= super::pop_process_request(this->ready_heap
,
1473 process_f(result
, PhaseType::priority
));
1474 // need to use retn temporarily
1475 auto& retn
= boost::get
<typename
PullReq::Retn
>(result
.data
);
1476 super::reduce_reservation_tags(retn
.client
, tag
);
1478 ++this->prop_sched_count
;
1485 pull_request_timer
.stop();
1494 // data_mtx should be held when called; unfortunately this
1495 // function has to be repeated in both push & pull
1497 typename
super::NextReq
next_request() {
1498 return next_request(get_time());
1500 }; // class PullPriorityQueue
1504 template<typename C
, typename R
, bool IsDelayed
=false, bool U1
=false, unsigned B
=2>
1505 class PushPriorityQueue
: public PriorityQueueBase
<C
,R
,IsDelayed
,U1
,B
> {
1509 using super
= PriorityQueueBase
<C
,R
,IsDelayed
,U1
,B
>;
1513 // a function to see whether the server can handle another request
1514 using CanHandleRequestFunc
= std::function
<bool(void)>;
1516 // a function to submit a request to the server; the second
1517 // parameter is a callback when it's completed
1518 using HandleRequestFunc
=
1519 std::function
<void(const C
&,typename
super::RequestRef
,PhaseType
,uint64_t)>;
1523 CanHandleRequestFunc can_handle_f
;
1524 HandleRequestFunc handle_f
;
1525 // for handling timed scheduling
1526 std::mutex sched_ahead_mtx
;
1527 std::condition_variable sched_ahead_cv
;
1528 Time sched_ahead_when
= TimeZero
;
1532 ProfileTimer
<std::chrono::nanoseconds
> add_request_timer
;
1533 ProfileTimer
<std::chrono::nanoseconds
> request_complete_timer
;
1537 // NB: threads declared last, so constructed last and destructed first
1539 std::thread sched_ahead_thd
;
1543 // push full constructor
1544 template<typename Rep
, typename Per
>
1545 PushPriorityQueue(typename
super::ClientInfoFunc _client_info_f
,
1546 CanHandleRequestFunc _can_handle_f
,
1547 HandleRequestFunc _handle_f
,
1548 std::chrono::duration
<Rep
,Per
> _idle_age
,
1549 std::chrono::duration
<Rep
,Per
> _erase_age
,
1550 std::chrono::duration
<Rep
,Per
> _check_time
,
1551 AtLimitParam at_limit_param
= AtLimit::Wait
,
1552 double anticipation_timeout
= 0.0) :
1553 super(_client_info_f
,
1554 _idle_age
, _erase_age
, _check_time
,
1555 at_limit_param
, anticipation_timeout
)
1557 can_handle_f
= _can_handle_f
;
1558 handle_f
= _handle_f
;
1559 sched_ahead_thd
= std::thread(&PushPriorityQueue::run_sched_ahead
, this);
1563 // push convenience constructor
1564 PushPriorityQueue(typename
super::ClientInfoFunc _client_info_f
,
1565 CanHandleRequestFunc _can_handle_f
,
1566 HandleRequestFunc _handle_f
,
1567 AtLimitParam at_limit_param
= AtLimit::Wait
,
1568 double _anticipation_timeout
= 0.0) :
1569 PushPriorityQueue(_client_info_f
,
1574 standard_check_time
,
1576 _anticipation_timeout
)
1582 ~PushPriorityQueue() {
1583 this->finishing
= true;
1585 std::lock_guard
<std::mutex
> l(sched_ahead_mtx
);
1586 sched_ahead_cv
.notify_one();
1588 sched_ahead_thd
.join();
1593 int add_request(R
&& request
,
1595 const ReqParams
& req_params
,
1596 const Cost cost
= 1u) {
1597 return add_request(typename
super::RequestRef(new R(std::move(request
))),
1605 int add_request(typename
super::RequestRef
&& request
,
1607 const ReqParams
& req_params
,
1608 const Cost cost
= 1u) {
1609 return add_request(request
, req_params
, client_id
, get_time(), cost
);
1613 int add_request_time(const R
& request
,
1615 const ReqParams
& req_params
,
1617 const Cost cost
= 1u) {
1618 return add_request(typename
super::RequestRef(new R(request
)),
1626 int add_request(typename
super::RequestRef
&& request
,
1628 const ReqParams
& req_params
,
1630 const Cost cost
= 1u) {
1631 typename
super::DataGuard
g(this->data_mtx
);
1633 add_request_timer
.start();
1635 int r
= super::do_add_request(std::move(request
),
1644 add_request_timer
.stop();
1650 void request_completed() {
1651 typename
super::DataGuard
g(this->data_mtx
);
1653 request_complete_timer
.start();
1657 request_complete_timer
.stop();
1663 // data_mtx should be held when called; furthermore, the heap
1664 // should not be empty and the top element of the heap should
1665 // not be already handled
1667 // NOTE: the use of "super::ClientRec" in either the template
1668 // construct or as a parameter to submit_top_request generated
1669 // a compiler error in g++ 4.8.4, when ClientRec was
1670 // "protected" rather than "public". By g++ 6.3.1 this was not
1671 // an issue. But for backwards compatibility
1672 // PriorityQueueBase::ClientRec is public.
1673 template<typename C1
,
1674 IndIntruHeapData
super::ClientRec::*C2
,
1677 typename
super::RequestMeta
1678 submit_top_request(IndIntruHeap
<C1
,typename
super::ClientRec
,C2
,C3
,B4
>& heap
,
1681 RequestTag tag
= super::pop_process_request(heap
,
1682 [this, phase
, &client_result
]
1684 const Cost request_cost
,
1685 typename
super::RequestRef
& request
) {
1686 client_result
= client
;
1687 handle_f(client
, std::move(request
), phase
, request_cost
);
1689 typename
super::RequestMeta
req(client_result
, tag
);
1694 // data_mtx should be held when called
1695 void submit_request(typename
super::HeapId heap_id
) {
1697 case super::HeapId::reservation
:
1698 // don't need to note client
1699 (void) submit_top_request(this->resv_heap
, PhaseType::reservation
);
1700 // unlike the other two cases, we do not reduce reservation
1702 ++this->reserv_sched_count
;
1704 case super::HeapId::ready
:
1706 auto req
= submit_top_request(this->ready_heap
, PhaseType::priority
);
1707 super::reduce_reservation_tags(req
.client_id
, req
.tag
);
1709 ++this->prop_sched_count
;
1717 // data_mtx should be held when called; unfortunately this
1718 // function has to be repeated in both push & pull
1720 typename
super::NextReq
next_request() {
1721 return next_request(get_time());
1725 // data_mtx should be held when called; overrides member
1726 // function in base class to add check for whether a request can
1727 // be pushed to the server
1728 typename
super::NextReq
next_request(Time now
) {
1729 if (!can_handle_f()) {
1730 typename
super::NextReq result
;
1731 result
.type
= super::NextReqType::none
;
1734 return super::do_next_request(now
);
1739 // data_mtx should be held when called
1740 void schedule_request() {
1741 typename
super::NextReq next_req
= next_request();
1742 switch (next_req
.type
) {
1743 case super::NextReqType::none
:
1745 case super::NextReqType::future
:
1746 sched_at(next_req
.when_ready
);
1748 case super::NextReqType::returning
:
1749 submit_request(next_req
.heap_id
);
1757 // this is the thread that handles running schedule_request at
1758 // future times when nothing can be scheduled immediately
1759 void run_sched_ahead() {
1760 std::unique_lock
<std::mutex
> l(sched_ahead_mtx
);
1762 while (!this->finishing
) {
1763 // predicate for cond.wait()
1764 const auto pred
= [this] () -> bool { return this->finishing
; };
1766 if (TimeZero
== sched_ahead_when
) {
1767 sched_ahead_cv
.wait(l
, pred
);
1769 // cast from Time -> duration<Time> -> Duration -> TimePoint
1770 const auto until
= typename
super::TimePoint
{
1771 duration_cast
<typename
super::Duration
>(
1772 std::chrono::duration
<Time
>{sched_ahead_when
})};
1773 sched_ahead_cv
.wait_until(l
, until
, pred
);
1774 sched_ahead_when
= TimeZero
;
1775 if (this->finishing
) return;
1778 if (!this->finishing
) {
1779 typename
super::DataGuard
g(this->data_mtx
);
1788 void sched_at(Time when
) {
1789 std::lock_guard
<std::mutex
> l(sched_ahead_mtx
);
1790 if (this->finishing
) return;
1791 if (TimeZero
== sched_ahead_when
|| when
< sched_ahead_when
) {
1792 sched_ahead_when
= when
;
1793 sched_ahead_cv
.notify_one();
1796 }; // class PushPriorityQueue
1798 } // namespace dmclock
1799 } // namespace crimson