]> git.proxmox.com Git - ceph.git/blob - ceph/src/dmclock/src/dmclock_server.h
8aaad3977264847c0c3351d76eff05da7d629171
[ceph.git] / ceph / src / dmclock / src / dmclock_server.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /*
5 * Copyright (C) 2017 Red Hat Inc.
6 */
7
8
9 #pragma once
10
11 /* COMPILATION OPTIONS
12 *
13 * By default we include an optimization over the originally published
14 * dmclock algorithm using not the values of rho and delta that were
15 * sent in with a request but instead the most recent rho and delta
16 * values from the requests's client. To restore the algorithm's
17 * original behavior, define DO_NOT_DELAY_TAG_CALC (i.e., compiler
18 * argument -DDO_NOT_DELAY_TAG_CALC).
19 *
20 * The prop_heap does not seem to be necessary. The only thing it
21 * would help with is quickly finding the mininum proportion/prioity
22 * when an idle client became active. To have the code maintain the
23 * proportional heap, define USE_PROP_HEAP (i.e., compiler argument
24 * -DUSE_PROP_HEAP).
25 */
26
27 #include <assert.h>
28
29 #include <cmath>
30 #include <memory>
31 #include <map>
32 #include <deque>
33 #include <queue>
34 #include <atomic>
35 #include <mutex>
36 #include <condition_variable>
37 #include <thread>
38 #include <iostream>
39 #include <sstream>
40 #include <limits>
41
42 #include <boost/variant.hpp>
43
44 #include "indirect_intrusive_heap.h"
45 #include "run_every.h"
46 #include "dmclock_util.h"
47 #include "dmclock_recs.h"
48
49 #ifdef PROFILE
50 #include "profile.h"
51 #endif
52
53 #include "gtest/gtest_prod.h"
54
55
56 namespace crimson {
57
58 namespace dmclock {
59
60 namespace c = crimson;
61
62 constexpr double max_tag = std::numeric_limits<double>::is_iec559 ?
63 std::numeric_limits<double>::infinity() :
64 std::numeric_limits<double>::max();
65 constexpr double min_tag = std::numeric_limits<double>::is_iec559 ?
66 -std::numeric_limits<double>::infinity() :
67 std::numeric_limits<double>::lowest();
68 constexpr uint tag_modulo = 1000000;
69
70 struct ClientInfo {
71 const double reservation; // minimum
72 const double weight; // proportional
73 const double limit; // maximum
74
75 // multiplicative inverses of above, which we use in calculations
76 // and don't want to recalculate repeatedly
77 const double reservation_inv;
78 const double weight_inv;
79 const double limit_inv;
80
81 // order parameters -- min, "normal", max
82 ClientInfo(double _reservation, double _weight, double _limit) :
83 reservation(_reservation),
84 weight(_weight),
85 limit(_limit),
86 reservation_inv(0.0 == reservation ? 0.0 : 1.0 / reservation),
87 weight_inv( 0.0 == weight ? 0.0 : 1.0 / weight),
88 limit_inv( 0.0 == limit ? 0.0 : 1.0 / limit)
89 {
90 // empty
91 }
92
93
94 friend std::ostream& operator<<(std::ostream& out,
95 const ClientInfo& client) {
96 out <<
97 "{ ClientInfo:: r:" << client.reservation <<
98 " w:" << std::fixed << client.weight <<
99 " l:" << std::fixed << client.limit <<
100 " 1/r:" << std::fixed << client.reservation_inv <<
101 " 1/w:" << std::fixed << client.weight_inv <<
102 " 1/l:" << std::fixed << client.limit_inv <<
103 " }";
104 return out;
105 }
106 }; // class ClientInfo
107
108
109 struct RequestTag {
110 double reservation;
111 double proportion;
112 double limit;
113 bool ready; // true when within limit
114 #ifndef DO_NOT_DELAY_TAG_CALC
115 Time arrival;
116 #endif
117
118 RequestTag(const RequestTag& prev_tag,
119 const ClientInfo& client,
120 const uint32_t delta,
121 const uint32_t rho,
122 const Time time,
123 const double cost = 0.0) :
124 reservation(cost + tag_calc(time,
125 prev_tag.reservation,
126 client.reservation_inv,
127 rho,
128 true)),
129 proportion(tag_calc(time,
130 prev_tag.proportion,
131 client.weight_inv,
132 delta,
133 true)),
134 limit(tag_calc(time,
135 prev_tag.limit,
136 client.limit_inv,
137 delta,
138 false)),
139 ready(false)
140 #ifndef DO_NOT_DELAY_TAG_CALC
141 , arrival(time)
142 #endif
143 {
144 assert(reservation < max_tag || proportion < max_tag);
145 }
146
147 RequestTag(const RequestTag& prev_tag,
148 const ClientInfo& client,
149 const ReqParams req_params,
150 const Time time,
151 const double cost = 0.0) :
152 RequestTag(prev_tag, client, req_params.delta, req_params.rho, time, cost)
153 { /* empty */ }
154
155 RequestTag(double _res, double _prop, double _lim, const Time _arrival) :
156 reservation(_res),
157 proportion(_prop),
158 limit(_lim),
159 ready(false)
160 #ifndef DO_NOT_DELAY_TAG_CALC
161 , arrival(_arrival)
162 #endif
163 {
164 assert(reservation < max_tag || proportion < max_tag);
165 }
166
167 RequestTag(const RequestTag& other) :
168 reservation(other.reservation),
169 proportion(other.proportion),
170 limit(other.limit),
171 ready(other.ready)
172 #ifndef DO_NOT_DELAY_TAG_CALC
173 , arrival(other.arrival)
174 #endif
175 {
176 // empty
177 }
178
179 static std::string format_tag_change(double before, double after) {
180 if (before == after) {
181 return std::string("same");
182 } else {
183 std::stringstream ss;
184 ss << format_tag(before) << "=>" << format_tag(after);
185 return ss.str();
186 }
187 }
188
189 static std::string format_tag(double value) {
190 if (max_tag == value) {
191 return std::string("max");
192 } else if (min_tag == value) {
193 return std::string("min");
194 } else {
195 return format_time(value, tag_modulo);
196 }
197 }
198
199 private:
200
201 static double tag_calc(const Time time,
202 double prev,
203 double increment,
204 uint32_t dist_req_val,
205 bool extreme_is_high) {
206 if (0.0 == increment) {
207 return extreme_is_high ? max_tag : min_tag;
208 } else {
209 if (0 != dist_req_val) {
210 increment *= dist_req_val;
211 }
212 return std::max(time, prev + increment);
213 }
214 }
215
216 friend std::ostream& operator<<(std::ostream& out,
217 const RequestTag& tag) {
218 out <<
219 "{ RequestTag:: ready:" << (tag.ready ? "true" : "false") <<
220 " r:" << format_tag(tag.reservation) <<
221 " p:" << format_tag(tag.proportion) <<
222 " l:" << format_tag(tag.limit) <<
223 #if 0 // try to resolve this to make sure Time is operator<<'able.
224 #ifndef DO_NOT_DELAY_TAG_CALC
225 " arrival:" << tag.arrival <<
226 #endif
227 #endif
228 " }";
229 return out;
230 }
231 }; // class RequestTag
232
233
234 // C is client identifier type, R is request type, B is heap
235 // branching factor
236 template<typename C, typename R, uint B>
237 class PriorityQueueBase {
238 FRIEND_TEST(dmclock_server, client_idle_erase);
239
240 public:
241
242 using RequestRef = std::unique_ptr<R>;
243
244 protected:
245
246 using TimePoint = decltype(std::chrono::steady_clock::now());
247 using Duration = std::chrono::milliseconds;
248 using MarkPoint = std::pair<TimePoint,Counter>;
249
250 enum class ReadyOption {ignore, lowers, raises};
251
252 // forward decl for friend decls
253 template<double RequestTag::*, ReadyOption, bool>
254 struct ClientCompare;
255
256 class ClientReq {
257 friend PriorityQueueBase;
258
259 RequestTag tag;
260 C client_id;
261 RequestRef request;
262
263 public:
264
265 ClientReq(const RequestTag& _tag,
266 const C& _client_id,
267 RequestRef&& _request) :
268 tag(_tag),
269 client_id(_client_id),
270 request(std::move(_request))
271 {
272 // empty
273 }
274
275 friend std::ostream& operator<<(std::ostream& out, const ClientReq& c) {
276 out << "{ ClientReq:: tag:" << c.tag << " client:" <<
277 c.client_id << " }";
278 return out;
279 }
280 }; // class ClientReq
281
282 public:
283
284 // NOTE: ClientRec is in the "public" section for compatibility
285 // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1
286 // ClientRec could be "protected" with no issue. [See comments
287 // associated with function submit_top_request.]
288 class ClientRec {
289 friend PriorityQueueBase<C,R,B>;
290
291 C client;
292 RequestTag prev_tag;
293 std::deque<ClientReq> requests;
294
295 // amount added from the proportion tag as a result of
296 // an idle client becoming unidle
297 double prop_delta = 0.0;
298
299 c::IndIntruHeapData reserv_heap_data;
300 c::IndIntruHeapData lim_heap_data;
301 c::IndIntruHeapData ready_heap_data;
302 #if USE_PROP_HEAP
303 c::IndIntruHeapData prop_heap_data;
304 #endif
305
306 public:
307
308 ClientInfo info;
309 bool idle;
310 Counter last_tick;
311 uint32_t cur_rho;
312 uint32_t cur_delta;
313
314 ClientRec(C _client,
315 const ClientInfo& _info,
316 Counter current_tick) :
317 client(_client),
318 prev_tag(0.0, 0.0, 0.0, TimeZero),
319 info(_info),
320 idle(true),
321 last_tick(current_tick),
322 cur_rho(1),
323 cur_delta(1)
324 {
325 // empty
326 }
327
328 inline const RequestTag& get_req_tag() const {
329 return prev_tag;
330 }
331
332 static inline void assign_unpinned_tag(double& lhs, const double rhs) {
333 if (rhs != max_tag && rhs != min_tag) {
334 lhs = rhs;
335 }
336 }
337
338 inline void update_req_tag(const RequestTag& _prev,
339 const Counter& _tick) {
340 assign_unpinned_tag(prev_tag.reservation, _prev.reservation);
341 assign_unpinned_tag(prev_tag.limit, _prev.limit);
342 assign_unpinned_tag(prev_tag.proportion, _prev.proportion);
343 last_tick = _tick;
344 }
345
346 inline void add_request(const RequestTag& tag,
347 const C& client_id,
348 RequestRef&& request) {
349 requests.emplace_back(ClientReq(tag, client_id, std::move(request)));
350 }
351
352 inline const ClientReq& next_request() const {
353 return requests.front();
354 }
355
356 inline ClientReq& next_request() {
357 return requests.front();
358 }
359
360 inline void pop_request() {
361 requests.pop_front();
362 }
363
364 inline bool has_request() const {
365 return !requests.empty();
366 }
367
368 inline size_t request_count() const {
369 return requests.size();
370 }
371
372 // NB: because a deque is the underlying structure, this
373 // operation might be expensive
374 bool remove_by_req_filter_fw(std::function<bool(const R&)> filter_accum) {
375 bool any_removed = false;
376 for (auto i = requests.begin();
377 i != requests.end();
378 /* no inc */) {
379 if (filter_accum(*i->request)) {
380 any_removed = true;
381 i = requests.erase(i);
382 } else {
383 ++i;
384 }
385 }
386 return any_removed;
387 }
388
389 // NB: because a deque is the underlying structure, this
390 // operation might be expensive
391 bool remove_by_req_filter_bw(std::function<bool(const R&)> filter_accum) {
392 bool any_removed = false;
393 for (auto i = requests.rbegin();
394 i != requests.rend();
395 /* no inc */) {
396 if (filter_accum(*i->request)) {
397 any_removed = true;
398 i = decltype(i){ requests.erase(std::next(i).base()) };
399 } else {
400 ++i;
401 }
402 }
403 return any_removed;
404 }
405
406 inline bool
407 remove_by_req_filter(std::function<bool(const R&)> filter_accum,
408 bool visit_backwards) {
409 if (visit_backwards) {
410 return remove_by_req_filter_bw(filter_accum);
411 } else {
412 return remove_by_req_filter_fw(filter_accum);
413 }
414 }
415
416 friend std::ostream&
417 operator<<(std::ostream& out,
418 const typename PriorityQueueBase<C,R,B>::ClientRec& e) {
419 out << "{ ClientRec::" <<
420 " client:" << e.client <<
421 " prev_tag:" << e.prev_tag <<
422 " req_count:" << e.requests.size() <<
423 " top_req:";
424 if (e.has_request()) {
425 out << e.next_request();
426 } else {
427 out << "none";
428 }
429 out << " }";
430
431 return out;
432 }
433 }; // class ClientRec
434
435 using ClientRecRef = std::shared_ptr<ClientRec>;
436
437 // when we try to get the next request, we'll be in one of three
438 // situations -- we'll have one to return, have one that can
439 // fire in the future, or not have any
440 enum class NextReqType { returning, future, none };
441
442 // specifies which queue next request will get popped from
443 enum class HeapId { reservation, ready };
444
445 // this is returned from next_req to tell the caller the situation
446 struct NextReq {
447 NextReqType type;
448 union {
449 HeapId heap_id;
450 Time when_ready;
451 };
452 };
453
454
455 // a function that can be called to look up client information
456 using ClientInfoFunc = std::function<ClientInfo(const C&)>;
457
458
459 bool empty() const {
460 DataGuard g(data_mtx);
461 return (resv_heap.empty() || ! resv_heap.top().has_request());
462 }
463
464
465 size_t client_count() const {
466 DataGuard g(data_mtx);
467 return resv_heap.size();
468 }
469
470
471 size_t request_count() const {
472 DataGuard g(data_mtx);
473 size_t total = 0;
474 for (auto i = resv_heap.cbegin(); i != resv_heap.cend(); ++i) {
475 total += i->request_count();
476 }
477 return total;
478 }
479
480
481 bool remove_by_req_filter(std::function<bool(const R&)> filter_accum,
482 bool visit_backwards = false) {
483 bool any_removed = false;
484 DataGuard g(data_mtx);
485 for (auto i : client_map) {
486 bool modified =
487 i.second->remove_by_req_filter(filter_accum, visit_backwards);
488 if (modified) {
489 resv_heap.adjust(*i.second);
490 limit_heap.adjust(*i.second);
491 ready_heap.adjust(*i.second);
492 #if USE_PROP_HEAP
493 prop_heap.adjust(*i.second);
494 #endif
495 any_removed = true;
496 }
497 }
498 return any_removed;
499 }
500
501
502 // use as a default value when no accumulator is provide
503 static void request_sink(const R& req) {
504 // do nothing
505 }
506
507
508 void remove_by_client(const C& client,
509 bool reverse = false,
510 std::function<void (const R&)> accum = request_sink) {
511 DataGuard g(data_mtx);
512
513 auto i = client_map.find(client);
514
515 if (i == client_map.end()) return;
516
517 if (reverse) {
518 for (auto j = i->second->requests.rbegin();
519 j != i->second->requests.rend();
520 ++j) {
521 accum(*j->request);
522 }
523 } else {
524 for (auto j = i->second->requests.begin();
525 j != i->second->requests.end();
526 ++j) {
527 accum(*j->request);
528 }
529 }
530
531 i->second->requests.clear();
532
533 resv_heap.adjust(*i->second);
534 limit_heap.adjust(*i->second);
535 ready_heap.adjust(*i->second);
536 #if USE_PROP_HEAP
537 prop_heap.adjust(*i->second);
538 #endif
539 }
540
541
542 uint get_heap_branching_factor() const {
543 return B;
544 }
545
546
547 friend std::ostream& operator<<(std::ostream& out,
548 const PriorityQueueBase& q) {
549 std::lock_guard<decltype(q.data_mtx)> guard(q.data_mtx);
550
551 out << "{ PriorityQueue::";
552 for (const auto& c : q.client_map) {
553 out << " { client:" << c.first << ", record:" << *c.second <<
554 " }";
555 }
556 if (!q.resv_heap.empty()) {
557 const auto& resv = q.resv_heap.top();
558 out << " { reservation_top:" << resv << " }";
559 const auto& ready = q.ready_heap.top();
560 out << " { ready_top:" << ready << " }";
561 const auto& limit = q.limit_heap.top();
562 out << " { limit_top:" << limit << " }";
563 } else {
564 out << " HEAPS-EMPTY";
565 }
566 out << " }";
567
568 return out;
569 }
570
571 // for debugging
572 void display_queues(std::ostream& out,
573 bool show_res = true,
574 bool show_lim = true,
575 bool show_ready = true,
576 bool show_prop = true) const {
577 auto filter = [](const ClientRec& e)->bool { return true; };
578 DataGuard g(data_mtx);
579 if (show_res) {
580 resv_heap.display_sorted(out << "RESER:", filter);
581 }
582 if (show_lim) {
583 limit_heap.display_sorted(out << "LIMIT:", filter);
584 }
585 if (show_ready) {
586 ready_heap.display_sorted(out << "READY:", filter);
587 }
588 #if USE_PROP_HEAP
589 if (show_prop) {
590 prop_heap.display_sorted(out << "PROPO:", filter);
591 }
592 #endif
593 } // display_queues
594
595
596 protected:
597
598 // The ClientCompare functor is essentially doing a precedes?
599 // operator, returning true if and only if the first parameter
600 // must precede the second parameter. If the second must precede
601 // the first, or if they are equivalent, false should be
602 // returned. The reason for this behavior is that it will be
603 // called to test if two items are out of order and if true is
604 // returned it will reverse the items. Therefore false is the
605 // default return when it doesn't matter to prevent unnecessary
606 // re-ordering.
607 //
608 // The template is supporting variations in sorting based on the
609 // heap in question and allowing these variations to be handled
610 // at compile-time.
611 //
612 // tag_field determines which tag is being used for comparison
613 //
614 // ready_opt determines how the ready flag influences the sort
615 //
616 // use_prop_delta determines whether the proportional delta is
617 // added in for comparison
618 template<double RequestTag::*tag_field,
619 ReadyOption ready_opt,
620 bool use_prop_delta>
621 struct ClientCompare {
622 bool operator()(const ClientRec& n1, const ClientRec& n2) const {
623 if (n1.has_request()) {
624 if (n2.has_request()) {
625 const auto& t1 = n1.next_request().tag;
626 const auto& t2 = n2.next_request().tag;
627 if (ReadyOption::ignore == ready_opt || t1.ready == t2.ready) {
628 // if we don't care about ready or the ready values are the same
629 if (use_prop_delta) {
630 return (t1.*tag_field + n1.prop_delta) <
631 (t2.*tag_field + n2.prop_delta);
632 } else {
633 return t1.*tag_field < t2.*tag_field;
634 }
635 } else if (ReadyOption::raises == ready_opt) {
636 // use_ready == true && the ready fields are different
637 return t1.ready;
638 } else {
639 return t2.ready;
640 }
641 } else {
642 // n1 has request but n2 does not
643 return true;
644 }
645 } else if (n2.has_request()) {
646 // n2 has request but n1 does not
647 return false;
648 } else {
649 // both have none; keep stable w false
650 return false;
651 }
652 }
653 };
654
655 ClientInfoFunc client_info_f;
656
657 mutable std::mutex data_mtx;
658 using DataGuard = std::lock_guard<decltype(data_mtx)>;
659
660 // stable mapping between client ids and client queues
661 std::map<C,ClientRecRef> client_map;
662
663 c::IndIntruHeap<ClientRecRef,
664 ClientRec,
665 &ClientRec::reserv_heap_data,
666 ClientCompare<&RequestTag::reservation,
667 ReadyOption::ignore,
668 false>,
669 B> resv_heap;
670 #if USE_PROP_HEAP
671 c::IndIntruHeap<ClientRecRef,
672 ClientRec,
673 &ClientRec::prop_heap_data,
674 ClientCompare<&RequestTag::proportion,
675 ReadyOption::ignore,
676 true>,
677 B> prop_heap;
678 #endif
679 c::IndIntruHeap<ClientRecRef,
680 ClientRec,
681 &ClientRec::lim_heap_data,
682 ClientCompare<&RequestTag::limit,
683 ReadyOption::lowers,
684 false>,
685 B> limit_heap;
686 c::IndIntruHeap<ClientRecRef,
687 ClientRec,
688 &ClientRec::ready_heap_data,
689 ClientCompare<&RequestTag::proportion,
690 ReadyOption::raises,
691 true>,
692 B> ready_heap;
693
694 // if all reservations are met and all other requestes are under
695 // limit, this will allow the request next in terms of
696 // proportion to still get issued
697 bool allow_limit_break;
698
699 std::atomic_bool finishing;
700
701 // every request creates a tick
702 Counter tick = 0;
703
704 // performance data collection
705 size_t reserv_sched_count = 0;
706 size_t prop_sched_count = 0;
707 size_t limit_break_sched_count = 0;
708
709 Duration idle_age;
710 Duration erase_age;
711 Duration check_time;
712 std::deque<MarkPoint> clean_mark_points;
713
714 // NB: All threads declared at end, so they're destructed first!
715
716 std::unique_ptr<RunEvery> cleaning_job;
717
718
719 // COMMON constructor that others feed into; we can accept three
720 // different variations of durations
721 template<typename Rep, typename Per>
722 PriorityQueueBase(ClientInfoFunc _client_info_f,
723 std::chrono::duration<Rep,Per> _idle_age,
724 std::chrono::duration<Rep,Per> _erase_age,
725 std::chrono::duration<Rep,Per> _check_time,
726 bool _allow_limit_break) :
727 client_info_f(_client_info_f),
728 allow_limit_break(_allow_limit_break),
729 finishing(false),
730 idle_age(std::chrono::duration_cast<Duration>(_idle_age)),
731 erase_age(std::chrono::duration_cast<Duration>(_erase_age)),
732 check_time(std::chrono::duration_cast<Duration>(_check_time))
733 {
734 assert(_erase_age >= _idle_age);
735 assert(_check_time < _idle_age);
736 cleaning_job =
737 std::unique_ptr<RunEvery>(
738 new RunEvery(check_time,
739 std::bind(&PriorityQueueBase::do_clean, this)));
740 }
741
742
743 ~PriorityQueueBase() {
744 finishing = true;
745 }
746
747
748 // data_mtx must be held by caller
749 void do_add_request(RequestRef&& request,
750 const C& client_id,
751 const ReqParams& req_params,
752 const Time time,
753 const double cost = 0.0) {
754 ++tick;
755
756 // this pointer will help us create a reference to a shared
757 // pointer, no matter which of two codepaths we take
758 ClientRec* temp_client;
759
760 auto client_it = client_map.find(client_id);
761 if (client_map.end() != client_it) {
762 temp_client = &(*client_it->second); // address of obj of shared_ptr
763 } else {
764 ClientInfo info = client_info_f(client_id);
765 ClientRecRef client_rec =
766 std::make_shared<ClientRec>(client_id, info, tick);
767 resv_heap.push(client_rec);
768 #if USE_PROP_HEAP
769 prop_heap.push(client_rec);
770 #endif
771 limit_heap.push(client_rec);
772 ready_heap.push(client_rec);
773 client_map[client_id] = client_rec;
774 temp_client = &(*client_rec); // address of obj of shared_ptr
775 }
776
777 // for convenience, we'll create a reference to the shared pointer
778 ClientRec& client = *temp_client;
779
780 if (client.idle) {
781 // We need to do an adjustment so that idle clients compete
782 // fairly on proportional tags since those tags may have
783 // drifted from real-time. Either use the lowest existing
784 // proportion tag -- O(1) -- or the client with the lowest
785 // previous proportion tag -- O(n) where n = # clients.
786 //
787 // So we don't have to maintain a propotional queue that
788 // keeps the minimum on proportional tag alone (we're
789 // instead using a ready queue), we'll have to check each
790 // client.
791 //
792 // The alternative would be to maintain a proportional queue
793 // (define USE_PROP_TAG) and do an O(1) operation here.
794
795 // Was unable to confirm whether equality testing on
796 // std::numeric_limits<double>::max() is guaranteed, so
797 // we'll use a compile-time calculated trigger that is one
798 // third the max, which should be much larger than any
799 // expected organic value.
800 constexpr double lowest_prop_tag_trigger =
801 std::numeric_limits<double>::max() / 3.0;
802
803 double lowest_prop_tag = std::numeric_limits<double>::max();
804 for (auto const &c : client_map) {
805 // don't use ourselves (or anything else that might be
806 // listed as idle) since we're now in the map
807 if (!c.second->idle) {
808 double p;
809 // use either lowest proportion tag or previous proportion tag
810 if (c.second->has_request()) {
811 p = c.second->next_request().tag.proportion +
812 c.second->prop_delta;
813 } else {
814 p = c.second->get_req_tag().proportion + c.second->prop_delta;
815 }
816
817 if (p < lowest_prop_tag) {
818 lowest_prop_tag = p;
819 }
820 }
821 }
822
823 // if this conditional does not fire, it
824 if (lowest_prop_tag < lowest_prop_tag_trigger) {
825 client.prop_delta = lowest_prop_tag - time;
826 }
827 client.idle = false;
828 } // if this client was idle
829
830 #ifndef DO_NOT_DELAY_TAG_CALC
831 RequestTag tag(0, 0, 0, time);
832
833 if (!client.has_request()) {
834 tag = RequestTag(client.get_req_tag(),
835 client.info,
836 req_params,
837 time,
838 cost);
839
840 // copy tag to previous tag for client
841 client.update_req_tag(tag, tick);
842 }
843 #else
844 RequestTag tag(client.get_req_tag(), client.info, req_params, time, cost);
845 // copy tag to previous tag for client
846 client.update_req_tag(tag, tick);
847 #endif
848
849 client.add_request(tag, client.client, std::move(request));
850 if (1 == client.requests.size()) {
851 // NB: can the following 4 calls to adjust be changed
852 // promote? Can adding a request ever demote a client in the
853 // heaps?
854 resv_heap.adjust(client);
855 limit_heap.adjust(client);
856 ready_heap.adjust(client);
857 #if USE_PROP_HEAP
858 prop_heap.adjust(client);
859 #endif
860 }
861
862 client.cur_rho = req_params.rho;
863 client.cur_delta = req_params.delta;
864
865 resv_heap.adjust(client);
866 limit_heap.adjust(client);
867 ready_heap.adjust(client);
868 #if USE_PROP_HEAP
869 prop_heap.adjust(client);
870 #endif
871 } // add_request
872
873
874 // data_mtx should be held when called; top of heap should have
875 // a ready request
876 template<typename C1, IndIntruHeapData ClientRec::*C2, typename C3>
877 void pop_process_request(IndIntruHeap<C1, ClientRec, C2, C3, B>& heap,
878 std::function<void(const C& client,
879 RequestRef& request)> process) {
880 // gain access to data
881 ClientRec& top = heap.top();
882
883 RequestRef request = std::move(top.next_request().request);
884 RequestTag tag = top.next_request().tag;
885
886 // pop request and adjust heaps
887 top.pop_request();
888
889 #ifndef DO_NOT_DELAY_TAG_CALC
890 if (top.has_request()) {
891 ClientReq& next_first = top.next_request();
892 next_first.tag = RequestTag(tag, top.info,
893 top.cur_delta, top.cur_rho,
894 next_first.tag.arrival);
895
896 // copy tag to previous tag for client
897 top.update_req_tag(next_first.tag, tick);
898 }
899 #endif
900
901 resv_heap.demote(top);
902 limit_heap.adjust(top);
903 #if USE_PROP_HEAP
904 prop_heap.demote(top);
905 #endif
906 ready_heap.demote(top);
907
908 // process
909 process(top.client, request);
910 } // pop_process_request
911
912
913 // data_mtx should be held when called
914 void reduce_reservation_tags(ClientRec& client) {
915 for (auto& r : client.requests) {
916 r.tag.reservation -= client.info.reservation_inv;
917
918 #ifndef DO_NOT_DELAY_TAG_CALC
919 // reduce only for front tag. because next tags' value are invalid
920 break;
921 #endif
922 }
923 // don't forget to update previous tag
924 client.prev_tag.reservation -= client.info.reservation_inv;
925 resv_heap.promote(client);
926 }
927
928
929 // data_mtx should be held when called
930 void reduce_reservation_tags(const C& client_id) {
931 auto client_it = client_map.find(client_id);
932
933 // means the client was cleaned from map; should never happen
934 // as long as cleaning times are long enough
935 assert(client_map.end() != client_it);
936 reduce_reservation_tags(*client_it->second);
937 }
938
939
940 // data_mtx should be held when called
941 NextReq do_next_request(Time now) {
942 NextReq result;
943
944 // if reservation queue is empty, all are empty (i.e., no active clients)
945 if(resv_heap.empty()) {
946 result.type = NextReqType::none;
947 return result;
948 }
949
950 // try constraint (reservation) based scheduling
951
952 auto& reserv = resv_heap.top();
953 if (reserv.has_request() &&
954 reserv.next_request().tag.reservation <= now) {
955 result.type = NextReqType::returning;
956 result.heap_id = HeapId::reservation;
957 return result;
958 }
959
960 // no existing reservations before now, so try weight-based
961 // scheduling
962
963 // all items that are within limit are eligible based on
964 // priority
965 auto limits = &limit_heap.top();
966 while (limits->has_request() &&
967 !limits->next_request().tag.ready &&
968 limits->next_request().tag.limit <= now) {
969 limits->next_request().tag.ready = true;
970 ready_heap.promote(*limits);
971 limit_heap.demote(*limits);
972
973 limits = &limit_heap.top();
974 }
975
976 auto& readys = ready_heap.top();
977 if (readys.has_request() &&
978 readys.next_request().tag.ready &&
979 readys.next_request().tag.proportion < max_tag) {
980 result.type = NextReqType::returning;
981 result.heap_id = HeapId::ready;
982 return result;
983 }
984
985 // if nothing is schedulable by reservation or
986 // proportion/weight, and if we allow limit break, try to
987 // schedule something with the lowest proportion tag or
988 // alternatively lowest reservation tag.
989 if (allow_limit_break) {
990 if (readys.has_request() &&
991 readys.next_request().tag.proportion < max_tag) {
992 result.type = NextReqType::returning;
993 result.heap_id = HeapId::ready;
994 return result;
995 } else if (reserv.has_request() &&
996 reserv.next_request().tag.reservation < max_tag) {
997 result.type = NextReqType::returning;
998 result.heap_id = HeapId::reservation;
999 return result;
1000 }
1001 }
1002
1003 // nothing scheduled; make sure we re-run when next
1004 // reservation item or next limited item comes up
1005
1006 Time next_call = TimeMax;
1007 if (resv_heap.top().has_request()) {
1008 next_call =
1009 min_not_0_time(next_call,
1010 resv_heap.top().next_request().tag.reservation);
1011 }
1012 if (limit_heap.top().has_request()) {
1013 const auto& next = limit_heap.top().next_request();
1014 assert(!next.tag.ready || max_tag == next.tag.proportion);
1015 next_call = min_not_0_time(next_call, next.tag.limit);
1016 }
1017 if (next_call < TimeMax) {
1018 result.type = NextReqType::future;
1019 result.when_ready = next_call;
1020 return result;
1021 } else {
1022 result.type = NextReqType::none;
1023 return result;
1024 }
1025 } // do_next_request
1026
1027
1028 // if possible is not zero and less than current then return it;
1029 // otherwise return current; the idea is we're trying to find
1030 // the minimal time but ignoring zero
1031 static inline const Time& min_not_0_time(const Time& current,
1032 const Time& possible) {
1033 return TimeZero == possible ? current : std::min(current, possible);
1034 }
1035
1036
1037 /*
1038 * This is being called regularly by RunEvery. Every time it's
1039 * called it notes the time and delta counter (mark point) in a
1040 * deque. It also looks at the deque to find the most recent
1041 * mark point that is older than clean_age. It then walks the
1042 * map and delete all server entries that were last used before
1043 * that mark point.
1044 */
1045 void do_clean() {
1046 TimePoint now = std::chrono::steady_clock::now();
1047 DataGuard g(data_mtx);
1048 clean_mark_points.emplace_back(MarkPoint(now, tick));
1049
1050 // first erase the super-old client records
1051
1052 Counter erase_point = 0;
1053 auto point = clean_mark_points.front();
1054 while (point.first <= now - erase_age) {
1055 erase_point = point.second;
1056 clean_mark_points.pop_front();
1057 point = clean_mark_points.front();
1058 }
1059
1060 Counter idle_point = 0;
1061 for (auto i : clean_mark_points) {
1062 if (i.first <= now - idle_age) {
1063 idle_point = i.second;
1064 } else {
1065 break;
1066 }
1067 }
1068
1069 if (erase_point > 0 || idle_point > 0) {
1070 for (auto i = client_map.begin(); i != client_map.end(); /* empty */) {
1071 auto i2 = i++;
1072 if (erase_point && i2->second->last_tick <= erase_point) {
1073 delete_from_heaps(i2->second);
1074 client_map.erase(i2);
1075 } else if (idle_point && i2->second->last_tick <= idle_point) {
1076 i2->second->idle = true;
1077 }
1078 } // for
1079 } // if
1080 } // do_clean
1081
1082
1083 // data_mtx must be held by caller
1084 template<IndIntruHeapData ClientRec::*C1,typename C2>
1085 void delete_from_heap(ClientRecRef& client,
1086 c::IndIntruHeap<ClientRecRef,ClientRec,C1,C2,B>& heap) {
1087 auto i = heap.rfind(client);
1088 heap.remove(i);
1089 }
1090
1091
1092 // data_mtx must be held by caller
1093 void delete_from_heaps(ClientRecRef& client) {
1094 delete_from_heap(client, resv_heap);
1095 #if USE_PROP_HEAP
1096 delete_from_heap(client, prop_heap);
1097 #endif
1098 delete_from_heap(client, limit_heap);
1099 delete_from_heap(client, ready_heap);
1100 }
1101 }; // class PriorityQueueBase
1102
1103
1104 template<typename C, typename R, uint B=2>
1105 class PullPriorityQueue : public PriorityQueueBase<C,R,B> {
1106 using super = PriorityQueueBase<C,R,B>;
1107
1108 public:
1109
1110 // When a request is pulled, this is the return type.
1111 struct PullReq {
1112 struct Retn {
1113 C client;
1114 typename super::RequestRef request;
1115 PhaseType phase;
1116 };
1117
1118 typename super::NextReqType type;
1119 boost::variant<Retn,Time> data;
1120
1121 bool is_none() const { return type == super::NextReqType::none; }
1122
1123 bool is_retn() const { return type == super::NextReqType::returning; }
1124 Retn& get_retn() {
1125 return boost::get<Retn>(data);
1126 }
1127
1128 bool is_future() const { return type == super::NextReqType::future; }
1129 Time getTime() const { return boost::get<Time>(data); }
1130 };
1131
1132
1133 #ifdef PROFILE
1134 ProfileTimer<std::chrono::nanoseconds> pull_request_timer;
1135 ProfileTimer<std::chrono::nanoseconds> add_request_timer;
1136 #endif
1137
1138 template<typename Rep, typename Per>
1139 PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1140 std::chrono::duration<Rep,Per> _idle_age,
1141 std::chrono::duration<Rep,Per> _erase_age,
1142 std::chrono::duration<Rep,Per> _check_time,
1143 bool _allow_limit_break = false) :
1144 super(_client_info_f,
1145 _idle_age, _erase_age, _check_time,
1146 _allow_limit_break)
1147 {
1148 // empty
1149 }
1150
1151
1152 // pull convenience constructor
1153 PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1154 bool _allow_limit_break = false) :
1155 PullPriorityQueue(_client_info_f,
1156 std::chrono::minutes(10),
1157 std::chrono::minutes(15),
1158 std::chrono::minutes(6),
1159 _allow_limit_break)
1160 {
1161 // empty
1162 }
1163
1164
1165 inline void add_request(const R& request,
1166 const C& client_id,
1167 const ReqParams& req_params,
1168 double addl_cost = 0.0) {
1169 add_request(typename super::RequestRef(new R(request)),
1170 client_id,
1171 req_params,
1172 get_time(),
1173 addl_cost);
1174 }
1175
1176
1177 inline void add_request(const R& request,
1178 const C& client_id,
1179 double addl_cost = 0.0) {
1180 static const ReqParams null_req_params;
1181 add_request(typename super::RequestRef(new R(request)),
1182 client_id,
1183 null_req_params,
1184 get_time(),
1185 addl_cost);
1186 }
1187
1188
1189
1190 inline void add_request_time(const R& request,
1191 const C& client_id,
1192 const ReqParams& req_params,
1193 const Time time,
1194 double addl_cost = 0.0) {
1195 add_request(typename super::RequestRef(new R(request)),
1196 client_id,
1197 req_params,
1198 time,
1199 addl_cost);
1200 }
1201
1202
1203 inline void add_request(typename super::RequestRef&& request,
1204 const C& client_id,
1205 const ReqParams& req_params,
1206 double addl_cost = 0.0) {
1207 add_request(request, req_params, client_id, get_time(), addl_cost);
1208 }
1209
1210
1211 inline void add_request(typename super::RequestRef&& request,
1212 const C& client_id,
1213 double addl_cost = 0.0) {
1214 static const ReqParams null_req_params;
1215 add_request(request, null_req_params, client_id, get_time(), addl_cost);
1216 }
1217
1218
1219 // this does the work; the versions above provide alternate interfaces
1220 void add_request(typename super::RequestRef&& request,
1221 const C& client_id,
1222 const ReqParams& req_params,
1223 const Time time,
1224 double addl_cost = 0.0) {
1225 typename super::DataGuard g(this->data_mtx);
1226 #ifdef PROFILE
1227 add_request_timer.start();
1228 #endif
1229 super::do_add_request(std::move(request),
1230 client_id,
1231 req_params,
1232 time,
1233 addl_cost);
1234 // no call to schedule_request for pull version
1235 #ifdef PROFILE
1236 add_request_timer.stop();
1237 #endif
1238 }
1239
1240
1241 inline PullReq pull_request() {
1242 return pull_request(get_time());
1243 }
1244
1245
1246 PullReq pull_request(Time now) {
1247 PullReq result;
1248 typename super::DataGuard g(this->data_mtx);
1249 #ifdef PROFILE
1250 pull_request_timer.start();
1251 #endif
1252
1253 typename super::NextReq next = super::do_next_request(now);
1254 result.type = next.type;
1255 switch(next.type) {
1256 case super::NextReqType::none:
1257 return result;
1258 break;
1259 case super::NextReqType::future:
1260 result.data = next.when_ready;
1261 return result;
1262 break;
1263 case super::NextReqType::returning:
1264 // to avoid nesting, break out and let code below handle this case
1265 break;
1266 default:
1267 assert(false);
1268 }
1269
1270 // we'll only get here if we're returning an entry
1271
1272 auto process_f =
1273 [&] (PullReq& pull_result, PhaseType phase) ->
1274 std::function<void(const C&,
1275 typename super::RequestRef&)> {
1276 return [&pull_result, phase](const C& client,
1277 typename super::RequestRef& request) {
1278 pull_result.data =
1279 typename PullReq::Retn{client, std::move(request), phase};
1280 };
1281 };
1282
1283 switch(next.heap_id) {
1284 case super::HeapId::reservation:
1285 super::pop_process_request(this->resv_heap,
1286 process_f(result, PhaseType::reservation));
1287 ++this->reserv_sched_count;
1288 break;
1289 case super::HeapId::ready:
1290 super::pop_process_request(this->ready_heap,
1291 process_f(result, PhaseType::priority));
1292 { // need to use retn temporarily
1293 auto& retn = boost::get<typename PullReq::Retn>(result.data);
1294 super::reduce_reservation_tags(retn.client);
1295 }
1296 ++this->prop_sched_count;
1297 break;
1298 default:
1299 assert(false);
1300 }
1301
1302 #ifdef PROFILE
1303 pull_request_timer.stop();
1304 #endif
1305 return result;
1306 } // pull_request
1307
1308
1309 protected:
1310
1311
1312 // data_mtx should be held when called; unfortunately this
1313 // function has to be repeated in both push & pull
1314 // specializations
1315 typename super::NextReq next_request() {
1316 return next_request(get_time());
1317 }
1318 }; // class PullPriorityQueue
1319
1320
1321 // PUSH version
1322 template<typename C, typename R, uint B=2>
1323 class PushPriorityQueue : public PriorityQueueBase<C,R,B> {
1324
1325 protected:
1326
1327 using super = PriorityQueueBase<C,R,B>;
1328
1329 public:
1330
1331 // a function to see whether the server can handle another request
1332 using CanHandleRequestFunc = std::function<bool(void)>;
1333
1334 // a function to submit a request to the server; the second
1335 // parameter is a callback when it's completed
1336 using HandleRequestFunc =
1337 std::function<void(const C&,typename super::RequestRef,PhaseType)>;
1338
1339 protected:
1340
1341 CanHandleRequestFunc can_handle_f;
1342 HandleRequestFunc handle_f;
1343 // for handling timed scheduling
1344 std::mutex sched_ahead_mtx;
1345 std::condition_variable sched_ahead_cv;
1346 Time sched_ahead_when = TimeZero;
1347
1348 #ifdef PROFILE
1349 public:
1350 ProfileTimer<std::chrono::nanoseconds> add_request_timer;
1351 ProfileTimer<std::chrono::nanoseconds> request_complete_timer;
1352 protected:
1353 #endif
1354
1355 // NB: threads declared last, so constructed last and destructed first
1356
1357 std::thread sched_ahead_thd;
1358
1359 public:
1360
1361 // push full constructor
1362 template<typename Rep, typename Per>
1363 PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1364 CanHandleRequestFunc _can_handle_f,
1365 HandleRequestFunc _handle_f,
1366 std::chrono::duration<Rep,Per> _idle_age,
1367 std::chrono::duration<Rep,Per> _erase_age,
1368 std::chrono::duration<Rep,Per> _check_time,
1369 bool _allow_limit_break = false) :
1370 super(_client_info_f,
1371 _idle_age, _erase_age, _check_time,
1372 _allow_limit_break)
1373 {
1374 can_handle_f = _can_handle_f;
1375 handle_f = _handle_f;
1376 sched_ahead_thd = std::thread(&PushPriorityQueue::run_sched_ahead, this);
1377 }
1378
1379
1380 // push convenience constructor
1381 PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1382 CanHandleRequestFunc _can_handle_f,
1383 HandleRequestFunc _handle_f,
1384 bool _allow_limit_break = false) :
1385 PushPriorityQueue(_client_info_f,
1386 _can_handle_f,
1387 _handle_f,
1388 std::chrono::minutes(10),
1389 std::chrono::minutes(15),
1390 std::chrono::minutes(6),
1391 _allow_limit_break)
1392 {
1393 // empty
1394 }
1395
1396
1397 ~PushPriorityQueue() {
1398 this->finishing = true;
1399 sched_ahead_cv.notify_one();
1400 sched_ahead_thd.join();
1401 }
1402
1403 public:
1404
1405 inline void add_request(const R& request,
1406 const C& client_id,
1407 const ReqParams& req_params,
1408 double addl_cost = 0.0) {
1409 add_request(typename super::RequestRef(new R(request)),
1410 client_id,
1411 req_params,
1412 get_time(),
1413 addl_cost);
1414 }
1415
1416
1417 inline void add_request(typename super::RequestRef&& request,
1418 const C& client_id,
1419 const ReqParams& req_params,
1420 double addl_cost = 0.0) {
1421 add_request(request, req_params, client_id, get_time(), addl_cost);
1422 }
1423
1424
1425 inline void add_request_time(const R& request,
1426 const C& client_id,
1427 const ReqParams& req_params,
1428 const Time time,
1429 double addl_cost = 0.0) {
1430 add_request(typename super::RequestRef(new R(request)),
1431 client_id,
1432 req_params,
1433 time,
1434 addl_cost);
1435 }
1436
1437
1438 void add_request(typename super::RequestRef&& request,
1439 const C& client_id,
1440 const ReqParams& req_params,
1441 const Time time,
1442 double addl_cost = 0.0) {
1443 typename super::DataGuard g(this->data_mtx);
1444 #ifdef PROFILE
1445 add_request_timer.start();
1446 #endif
1447 super::do_add_request(std::move(request),
1448 client_id,
1449 req_params,
1450 time,
1451 addl_cost);
1452 schedule_request();
1453 #ifdef PROFILE
1454 add_request_timer.stop();
1455 #endif
1456 }
1457
1458
1459 void request_completed() {
1460 typename super::DataGuard g(this->data_mtx);
1461 #ifdef PROFILE
1462 request_complete_timer.start();
1463 #endif
1464 schedule_request();
1465 #ifdef PROFILE
1466 request_complete_timer.stop();
1467 #endif
1468 }
1469
1470 protected:
1471
1472 // data_mtx should be held when called; furthermore, the heap
1473 // should not be empty and the top element of the heap should
1474 // not be already handled
1475 //
1476 // NOTE: the use of "super::ClientRec" in either the template
1477 // construct or as a parameter to submit_top_request generated
1478 // a compiler error in g++ 4.8.4, when ClientRec was
1479 // "protected" rather than "public". By g++ 6.3.1 this was not
1480 // an issue. But for backwards compatibility
1481 // PriorityQueueBase::ClientRec is public.
1482 template<typename C1,
1483 IndIntruHeapData super::ClientRec::*C2,
1484 typename C3,
1485 uint B4>
1486 C submit_top_request(IndIntruHeap<C1,typename super::ClientRec,C2,C3,B4>& heap,
1487 PhaseType phase) {
1488 C client_result;
1489 super::pop_process_request(heap,
1490 [this, phase, &client_result]
1491 (const C& client,
1492 typename super::RequestRef& request) {
1493 client_result = client;
1494 handle_f(client, std::move(request), phase);
1495 });
1496 return client_result;
1497 }
1498
1499
1500 // data_mtx should be held when called
1501 void submit_request(typename super::HeapId heap_id) {
1502 C client;
1503 switch(heap_id) {
1504 case super::HeapId::reservation:
1505 // don't need to note client
1506 (void) submit_top_request(this->resv_heap, PhaseType::reservation);
1507 // unlike the other two cases, we do not reduce reservation
1508 // tags here
1509 ++this->reserv_sched_count;
1510 break;
1511 case super::HeapId::ready:
1512 client = submit_top_request(this->ready_heap, PhaseType::priority);
1513 super::reduce_reservation_tags(client);
1514 ++this->prop_sched_count;
1515 break;
1516 default:
1517 assert(false);
1518 }
1519 } // submit_request
1520
1521
1522 // data_mtx should be held when called; unfortunately this
1523 // function has to be repeated in both push & pull
1524 // specializations
1525 typename super::NextReq next_request() {
1526 return next_request(get_time());
1527 }
1528
1529
1530 // data_mtx should be held when called; overrides member
1531 // function in base class to add check for whether a request can
1532 // be pushed to the server
1533 typename super::NextReq next_request(Time now) {
1534 if (!can_handle_f()) {
1535 typename super::NextReq result;
1536 result.type = super::NextReqType::none;
1537 return result;
1538 } else {
1539 return super::do_next_request(now);
1540 }
1541 } // next_request
1542
1543
1544 // data_mtx should be held when called
1545 void schedule_request() {
1546 typename super::NextReq next_req = next_request();
1547 switch (next_req.type) {
1548 case super::NextReqType::none:
1549 return;
1550 case super::NextReqType::future:
1551 sched_at(next_req.when_ready);
1552 break;
1553 case super::NextReqType::returning:
1554 submit_request(next_req.heap_id);
1555 break;
1556 default:
1557 assert(false);
1558 }
1559 }
1560
1561
1562 // this is the thread that handles running schedule_request at
1563 // future times when nothing can be scheduled immediately
1564 void run_sched_ahead() {
1565 std::unique_lock<std::mutex> l(sched_ahead_mtx);
1566
1567 while (!this->finishing) {
1568 if (TimeZero == sched_ahead_when) {
1569 sched_ahead_cv.wait(l);
1570 } else {
1571 Time now;
1572 while (!this->finishing && (now = get_time()) < sched_ahead_when) {
1573 long microseconds_l = long(1 + 1000000 * (sched_ahead_when - now));
1574 auto microseconds = std::chrono::microseconds(microseconds_l);
1575 sched_ahead_cv.wait_for(l, microseconds);
1576 }
1577 sched_ahead_when = TimeZero;
1578 if (this->finishing) return;
1579
1580 l.unlock();
1581 if (!this->finishing) {
1582 typename super::DataGuard g(this->data_mtx);
1583 schedule_request();
1584 }
1585 l.lock();
1586 }
1587 }
1588 }
1589
1590
1591 void sched_at(Time when) {
1592 std::lock_guard<std::mutex> l(sched_ahead_mtx);
1593 if (this->finishing) return;
1594 if (TimeZero == sched_ahead_when || when < sched_ahead_when) {
1595 sched_ahead_when = when;
1596 sched_ahead_cv.notify_one();
1597 }
1598 }
1599 }; // class PushPriorityQueue
1600
1601 } // namespace dmclock
1602 } // namespace crimson