]> git.proxmox.com Git - ceph.git/blame - ceph/src/dmclock/src/dmclock_server.h
bump version to 12.0.3-pve3
[ceph.git] / ceph / src / dmclock / src / dmclock_server.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4/*
5 * Copyright (C) 2017 Red Hat Inc.
6 */
7
8
9#pragma once
10
11/* COMPILATION OPTIONS
12 *
13 * By default we include an optimization over the originally published
14 * dmclock algorithm using not the values of rho and delta that were
15 * sent in with a request but instead the most recent rho and delta
16 * values from the requests's client. To restore the algorithm's
17 * original behavior, define DO_NOT_DELAY_TAG_CALC (i.e., compiler
18 * argument -DDO_NOT_DELAY_TAG_CALC).
19 *
20 * The prop_heap does not seem to be necessary. The only thing it
21 * would help with is quickly finding the mininum proportion/prioity
22 * when an idle client became active. To have the code maintain the
23 * proportional heap, define USE_PROP_HEAP (i.e., compiler argument
24 * -DUSE_PROP_HEAP).
25 */
26
27#include <assert.h>
28
29#include <cmath>
30#include <memory>
31#include <map>
32#include <deque>
33#include <queue>
34#include <atomic>
35#include <mutex>
36#include <condition_variable>
37#include <thread>
38#include <iostream>
39#include <sstream>
40#include <limits>
41
42#include <boost/variant.hpp>
43
44#include "indirect_intrusive_heap.h"
45#include "run_every.h"
46#include "dmclock_util.h"
47#include "dmclock_recs.h"
48
49#ifdef PROFILE
50#include "profile.h"
51#endif
52
53#include "gtest/gtest_prod.h"
54
55
56namespace crimson {
57
58 namespace dmclock {
59
60 namespace c = crimson;
61
62 constexpr double max_tag = std::numeric_limits<double>::is_iec559 ?
63 std::numeric_limits<double>::infinity() :
64 std::numeric_limits<double>::max();
65 constexpr double min_tag = std::numeric_limits<double>::is_iec559 ?
66 -std::numeric_limits<double>::infinity() :
67 std::numeric_limits<double>::lowest();
68 constexpr uint tag_modulo = 1000000;
69
70 struct ClientInfo {
71 const double reservation; // minimum
72 const double weight; // proportional
73 const double limit; // maximum
74
75 // multiplicative inverses of above, which we use in calculations
76 // and don't want to recalculate repeatedly
77 const double reservation_inv;
78 const double weight_inv;
79 const double limit_inv;
80
81 // order parameters -- min, "normal", max
82 ClientInfo(double _reservation, double _weight, double _limit) :
83 reservation(_reservation),
84 weight(_weight),
85 limit(_limit),
86 reservation_inv(0.0 == reservation ? 0.0 : 1.0 / reservation),
87 weight_inv( 0.0 == weight ? 0.0 : 1.0 / weight),
88 limit_inv( 0.0 == limit ? 0.0 : 1.0 / limit)
89 {
90 // empty
91 }
92
93
94 friend std::ostream& operator<<(std::ostream& out,
95 const ClientInfo& client) {
96 out <<
97 "{ ClientInfo:: r:" << client.reservation <<
98 " w:" << std::fixed << client.weight <<
99 " l:" << std::fixed << client.limit <<
100 " 1/r:" << std::fixed << client.reservation_inv <<
101 " 1/w:" << std::fixed << client.weight_inv <<
102 " 1/l:" << std::fixed << client.limit_inv <<
103 " }";
104 return out;
105 }
106 }; // class ClientInfo
107
108
109 struct RequestTag {
110 double reservation;
111 double proportion;
112 double limit;
113 bool ready; // true when within limit
114#ifndef DO_NOT_DELAY_TAG_CALC
115 Time arrival;
116#endif
117
118 RequestTag(const RequestTag& prev_tag,
119 const ClientInfo& client,
120 const ReqParams& req_params,
121 const Time& time,
122 const double cost = 0.0) :
123 reservation(cost + tag_calc(time,
124 prev_tag.reservation,
125 client.reservation_inv,
126 req_params.rho,
127 true)),
128 proportion(tag_calc(time,
129 prev_tag.proportion,
130 client.weight_inv,
131 req_params.delta,
132 true)),
133 limit(tag_calc(time,
134 prev_tag.limit,
135 client.limit_inv,
136 req_params.delta,
137 false)),
138 ready(false)
139#ifndef DO_NOT_DELAY_TAG_CALC
140 , arrival(time)
141#endif
142 {
143 assert(reservation < max_tag || proportion < max_tag);
144 }
145
146 RequestTag(double _res, double _prop, double _lim, const Time& _arrival) :
147 reservation(_res),
148 proportion(_prop),
149 limit(_lim),
150 ready(false)
151#ifndef DO_NOT_DELAY_TAG_CALC
152 , arrival(_arrival)
153#endif
154 {
155 assert(reservation < max_tag || proportion < max_tag);
156 }
157
158 RequestTag(const RequestTag& other) :
159 reservation(other.reservation),
160 proportion(other.proportion),
161 limit(other.limit),
162 ready(other.ready)
163#ifndef DO_NOT_DELAY_TAG_CALC
164 , arrival(other.arrival)
165#endif
166 {
167 // empty
168 }
169
170 static std::string format_tag_change(double before, double after) {
171 if (before == after) {
172 return std::string("same");
173 } else {
174 std::stringstream ss;
175 ss << format_tag(before) << "=>" << format_tag(after);
176 return ss.str();
177 }
178 }
179
180 static std::string format_tag(double value) {
181 if (max_tag == value) {
182 return std::string("max");
183 } else if (min_tag == value) {
184 return std::string("min");
185 } else {
186 return format_time(value, tag_modulo);
187 }
188 }
189
190 private:
191
192 static double tag_calc(const Time& time,
193 double prev,
194 double increment,
195 uint32_t dist_req_val,
196 bool extreme_is_high) {
197 if (0.0 == increment) {
198 return extreme_is_high ? max_tag : min_tag;
199 } else {
200 if (0 != dist_req_val) {
201 increment *= dist_req_val;
202 }
203 return std::max(time, prev + increment);
204 }
205 }
206
207 friend std::ostream& operator<<(std::ostream& out,
208 const RequestTag& tag) {
209 out <<
210 "{ RequestTag:: ready:" << (tag.ready ? "true" : "false") <<
211 " r:" << format_tag(tag.reservation) <<
212 " p:" << format_tag(tag.proportion) <<
213 " l:" << format_tag(tag.limit) <<
214#if 0 // try to resolve this to make sure Time is operator<<'able.
215#ifndef DO_NOT_DELAY_TAG_CALC
216 " arrival:" << tag.arrival <<
217#endif
218#endif
219 " }";
220 return out;
221 }
222 }; // class RequestTag
223
224
225 // C is client identifier type, R is request type, B is heap
226 // branching factor
227 template<typename C, typename R, uint B>
228 class PriorityQueueBase {
229 FRIEND_TEST(dmclock_server, client_idle_erase);
230
231 public:
232
233 using RequestRef = std::unique_ptr<R>;
234
235 protected:
236
237 using TimePoint = decltype(std::chrono::steady_clock::now());
238 using Duration = std::chrono::milliseconds;
239 using MarkPoint = std::pair<TimePoint,Counter>;
240
241 enum class ReadyOption {ignore, lowers, raises};
242
243 // forward decl for friend decls
244 template<double RequestTag::*, ReadyOption, bool>
245 struct ClientCompare;
246
247 class ClientReq {
248 friend PriorityQueueBase;
249
250 RequestTag tag;
251 C client_id;
252 RequestRef request;
253
254 public:
255
256 ClientReq(const RequestTag& _tag,
257 const C& _client_id,
258 RequestRef&& _request) :
259 tag(_tag),
260 client_id(_client_id),
261 request(std::move(_request))
262 {
263 // empty
264 }
265
266 friend std::ostream& operator<<(std::ostream& out, const ClientReq& c) {
267 out << "{ ClientReq:: tag:" << c.tag << " client:" <<
268 c.client_id << " }";
269 return out;
270 }
271 }; // class ClientReq
272
273 public:
274
275 // NOTE: ClientRec is in the "public" section for compatibility
276 // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1
277 // ClientRec could be "protected" with no issue. [See comments
278 // associated with function submit_top_request.]
279 class ClientRec {
280 friend PriorityQueueBase<C,R,B>;
281
282 C client;
283 RequestTag prev_tag;
284 std::deque<ClientReq> requests;
285
286 // amount added from the proportion tag as a result of
287 // an idle client becoming unidle
288 double prop_delta = 0.0;
289
290 c::IndIntruHeapData reserv_heap_data;
291 c::IndIntruHeapData lim_heap_data;
292 c::IndIntruHeapData ready_heap_data;
293#if USE_PROP_HEAP
294 c::IndIntruHeapData prop_heap_data;
295#endif
296
297 public:
298
299 ClientInfo info;
300 bool idle;
301 Counter last_tick;
302 uint32_t cur_rho;
303 uint32_t cur_delta;
304
305 ClientRec(C _client,
306 const ClientInfo& _info,
307 Counter current_tick) :
308 client(_client),
309 prev_tag(0.0, 0.0, 0.0, TimeZero),
310 info(_info),
311 idle(true),
312 last_tick(current_tick),
313 cur_rho(1),
314 cur_delta(1)
315 {
316 // empty
317 }
318
319 inline const RequestTag& get_req_tag() const {
320 return prev_tag;
321 }
322
323 static inline void assign_unpinned_tag(double& lhs, const double rhs) {
324 if (rhs != max_tag && rhs != min_tag) {
325 lhs = rhs;
326 }
327 }
328
329 inline void update_req_tag(const RequestTag& _prev,
330 const Counter& _tick) {
331 assign_unpinned_tag(prev_tag.reservation, _prev.reservation);
332 assign_unpinned_tag(prev_tag.limit, _prev.limit);
333 assign_unpinned_tag(prev_tag.proportion, _prev.proportion);
334 last_tick = _tick;
335 }
336
337 inline void add_request(const RequestTag& tag,
338 const C& client_id,
339 RequestRef&& request) {
340 requests.emplace_back(ClientReq(tag, client_id, std::move(request)));
341 }
342
343 inline const ClientReq& next_request() const {
344 return requests.front();
345 }
346
347 inline ClientReq& next_request() {
348 return requests.front();
349 }
350
351 inline void pop_request() {
352 requests.pop_front();
353 }
354
355 inline bool has_request() const {
356 return !requests.empty();
357 }
358
359 inline size_t request_count() const {
360 return requests.size();
361 }
362
363 // NB: because a deque is the underlying structure, this
364 // operation might be expensive
365 bool remove_by_req_filter_fw(std::function<bool(const R&)> filter_accum) {
366 bool any_removed = false;
367 for (auto i = requests.begin();
368 i != requests.end();
369 /* no inc */) {
370 if (filter_accum(*i->request)) {
371 any_removed = true;
372 i = requests.erase(i);
373 } else {
374 ++i;
375 }
376 }
377 return any_removed;
378 }
379
380 // NB: because a deque is the underlying structure, this
381 // operation might be expensive
382 bool remove_by_req_filter_bw(std::function<bool(const R&)> filter_accum) {
383 bool any_removed = false;
384 for (auto i = requests.rbegin();
385 i != requests.rend();
386 /* no inc */) {
387 if (filter_accum(*i->request)) {
388 any_removed = true;
389 i = decltype(i){ requests.erase(std::next(i).base()) };
390 } else {
391 ++i;
392 }
393 }
394 return any_removed;
395 }
396
397 inline bool
398 remove_by_req_filter(std::function<bool(const R&)> filter_accum,
399 bool visit_backwards) {
400 if (visit_backwards) {
401 return remove_by_req_filter_bw(filter_accum);
402 } else {
403 return remove_by_req_filter_fw(filter_accum);
404 }
405 }
406
407 friend std::ostream&
408 operator<<(std::ostream& out,
409 const typename PriorityQueueBase<C,R,B>::ClientRec& e) {
410 out << "{ ClientRec::" <<
411 " client:" << e.client <<
412 " prev_tag:" << e.prev_tag <<
413 " req_count:" << e.requests.size() <<
414 " top_req:";
415 if (e.has_request()) {
416 out << e.next_request();
417 } else {
418 out << "none";
419 }
420 out << " }";
421
422 return out;
423 }
424 }; // class ClientRec
425
426 using ClientRecRef = std::shared_ptr<ClientRec>;
427
428 // when we try to get the next request, we'll be in one of three
429 // situations -- we'll have one to return, have one that can
430 // fire in the future, or not have any
431 enum class NextReqType { returning, future, none };
432
433 // specifies which queue next request will get popped from
434 enum class HeapId { reservation, ready };
435
436 // this is returned from next_req to tell the caller the situation
437 struct NextReq {
438 NextReqType type;
439 union {
440 HeapId heap_id;
441 Time when_ready;
442 };
443 };
444
445
446 // a function that can be called to look up client information
447 using ClientInfoFunc = std::function<ClientInfo(const C&)>;
448
449
450 bool empty() const {
451 DataGuard g(data_mtx);
452 return (resv_heap.empty() || ! resv_heap.top().has_request());
453 }
454
455
456 size_t client_count() const {
457 DataGuard g(data_mtx);
458 return resv_heap.size();
459 }
460
461
462 size_t request_count() const {
463 DataGuard g(data_mtx);
464 size_t total = 0;
465 for (auto i = resv_heap.cbegin(); i != resv_heap.cend(); ++i) {
466 total += i->request_count();
467 }
468 return total;
469 }
470
471
472 bool remove_by_req_filter(std::function<bool(const R&)> filter_accum,
473 bool visit_backwards = false) {
474 bool any_removed = false;
475 DataGuard g(data_mtx);
476 for (auto i : client_map) {
477 bool modified =
478 i.second->remove_by_req_filter(filter_accum, visit_backwards);
479 if (modified) {
480 resv_heap.adjust(*i.second);
481 limit_heap.adjust(*i.second);
482 ready_heap.adjust(*i.second);
483#if USE_PROP_HEAP
484 prop_heap.adjust(*i.second);
485#endif
486 any_removed = true;
487 }
488 }
489 return any_removed;
490 }
491
492
493 // use as a default value when no accumulator is provide
494 static void request_sink(const R& req) {
495 // do nothing
496 }
497
498
499 void remove_by_client(const C& client,
500 bool reverse = false,
501 std::function<void (const R&)> accum = request_sink) {
502 DataGuard g(data_mtx);
503
504 auto i = client_map.find(client);
505
506 if (i == client_map.end()) return;
507
508 if (reverse) {
509 for (auto j = i->second->requests.rbegin();
510 j != i->second->requests.rend();
511 ++j) {
512 accum(*j->request);
513 }
514 } else {
515 for (auto j = i->second->requests.begin();
516 j != i->second->requests.end();
517 ++j) {
518 accum(*j->request);
519 }
520 }
521
522 i->second->requests.clear();
523
524 resv_heap.adjust(*i->second);
525 limit_heap.adjust(*i->second);
526 ready_heap.adjust(*i->second);
527#if USE_PROP_HEAP
528 prop_heap.adjust(*i->second);
529#endif
530 }
531
532
533 uint get_heap_branching_factor() const {
534 return B;
535 }
536
537
538 friend std::ostream& operator<<(std::ostream& out,
539 const PriorityQueueBase& q) {
540 std::lock_guard<decltype(q.data_mtx)> guard(q.data_mtx);
541
542 out << "{ PriorityQueue::";
543 for (const auto& c : q.client_map) {
544 out << " { client:" << c.first << ", record:" << *c.second <<
545 " }";
546 }
547 if (!q.resv_heap.empty()) {
548 const auto& resv = q.resv_heap.top();
549 out << " { reservation_top:" << resv << " }";
550 const auto& ready = q.ready_heap.top();
551 out << " { ready_top:" << ready << " }";
552 const auto& limit = q.limit_heap.top();
553 out << " { limit_top:" << limit << " }";
554 } else {
555 out << " HEAPS-EMPTY";
556 }
557 out << " }";
558
559 return out;
560 }
561
562 // for debugging
563 void display_queues(std::ostream& out,
564 bool show_res = true,
565 bool show_lim = true,
566 bool show_ready = true,
567 bool show_prop = true) const {
568 auto filter = [](const ClientRec& e)->bool { return true; };
569 DataGuard g(data_mtx);
570 if (show_res) {
571 resv_heap.display_sorted(out << "RESER:", filter);
572 }
573 if (show_lim) {
574 limit_heap.display_sorted(out << "LIMIT:", filter);
575 }
576 if (show_ready) {
577 ready_heap.display_sorted(out << "READY:", filter);
578 }
579#if USE_PROP_HEAP
580 if (show_prop) {
581 prop_heap.display_sorted(out << "PROPO:", filter);
582 }
583#endif
584 } // display_queues
585
586
587 protected:
588
589 // The ClientCompare functor is essentially doing a precedes?
590 // operator, returning true if and only if the first parameter
591 // must precede the second parameter. If the second must precede
592 // the first, or if they are equivalent, false should be
593 // returned. The reason for this behavior is that it will be
594 // called to test if two items are out of order and if true is
595 // returned it will reverse the items. Therefore false is the
596 // default return when it doesn't matter to prevent unnecessary
597 // re-ordering.
598 //
599 // The template is supporting variations in sorting based on the
600 // heap in question and allowing these variations to be handled
601 // at compile-time.
602 //
603 // tag_field determines which tag is being used for comparison
604 //
605 // ready_opt determines how the ready flag influences the sort
606 //
607 // use_prop_delta determines whether the proportional delta is
608 // added in for comparison
609 template<double RequestTag::*tag_field,
610 ReadyOption ready_opt,
611 bool use_prop_delta>
612 struct ClientCompare {
613 bool operator()(const ClientRec& n1, const ClientRec& n2) const {
614 if (n1.has_request()) {
615 if (n2.has_request()) {
616 const auto& t1 = n1.next_request().tag;
617 const auto& t2 = n2.next_request().tag;
618 if (ReadyOption::ignore == ready_opt || t1.ready == t2.ready) {
619 // if we don't care about ready or the ready values are the same
620 if (use_prop_delta) {
621 return (t1.*tag_field + n1.prop_delta) <
622 (t2.*tag_field + n2.prop_delta);
623 } else {
624 return t1.*tag_field < t2.*tag_field;
625 }
626 } else if (ReadyOption::raises == ready_opt) {
627 // use_ready == true && the ready fields are different
628 return t1.ready;
629 } else {
630 return t2.ready;
631 }
632 } else {
633 // n1 has request but n2 does not
634 return true;
635 }
636 } else if (n2.has_request()) {
637 // n2 has request but n1 does not
638 return false;
639 } else {
640 // both have none; keep stable w false
641 return false;
642 }
643 }
644 };
645
646 ClientInfoFunc client_info_f;
647
648 mutable std::mutex data_mtx;
649 using DataGuard = std::lock_guard<decltype(data_mtx)>;
650
651 // stable mapping between client ids and client queues
652 std::map<C,ClientRecRef> client_map;
653
654 c::IndIntruHeap<ClientRecRef,
655 ClientRec,
656 &ClientRec::reserv_heap_data,
657 ClientCompare<&RequestTag::reservation,
658 ReadyOption::ignore,
659 false>,
660 B> resv_heap;
661#if USE_PROP_HEAP
662 c::IndIntruHeap<ClientRecRef,
663 ClientRec,
664 &ClientRec::prop_heap_data,
665 ClientCompare<&RequestTag::proportion,
666 ReadyOption::ignore,
667 true>,
668 B> prop_heap;
669#endif
670 c::IndIntruHeap<ClientRecRef,
671 ClientRec,
672 &ClientRec::lim_heap_data,
673 ClientCompare<&RequestTag::limit,
674 ReadyOption::lowers,
675 false>,
676 B> limit_heap;
677 c::IndIntruHeap<ClientRecRef,
678 ClientRec,
679 &ClientRec::ready_heap_data,
680 ClientCompare<&RequestTag::proportion,
681 ReadyOption::raises,
682 true>,
683 B> ready_heap;
684
685 // if all reservations are met and all other requestes are under
686 // limit, this will allow the request next in terms of
687 // proportion to still get issued
688 bool allow_limit_break;
689
690 std::atomic_bool finishing;
691
692 // every request creates a tick
693 Counter tick = 0;
694
695 // performance data collection
696 size_t reserv_sched_count = 0;
697 size_t prop_sched_count = 0;
698 size_t limit_break_sched_count = 0;
699
700 Duration idle_age;
701 Duration erase_age;
702 Duration check_time;
703 std::deque<MarkPoint> clean_mark_points;
704
705 // NB: All threads declared at end, so they're destructed first!
706
707 std::unique_ptr<RunEvery> cleaning_job;
708
709
710 // COMMON constructor that others feed into; we can accept three
711 // different variations of durations
712 template<typename Rep, typename Per>
713 PriorityQueueBase(ClientInfoFunc _client_info_f,
714 std::chrono::duration<Rep,Per> _idle_age,
715 std::chrono::duration<Rep,Per> _erase_age,
716 std::chrono::duration<Rep,Per> _check_time,
717 bool _allow_limit_break) :
718 client_info_f(_client_info_f),
719 allow_limit_break(_allow_limit_break),
720 finishing(false),
721 idle_age(std::chrono::duration_cast<Duration>(_idle_age)),
722 erase_age(std::chrono::duration_cast<Duration>(_erase_age)),
723 check_time(std::chrono::duration_cast<Duration>(_check_time))
724 {
725 assert(_erase_age >= _idle_age);
726 assert(_check_time < _idle_age);
727 cleaning_job =
728 std::unique_ptr<RunEvery>(
729 new RunEvery(check_time,
730 std::bind(&PriorityQueueBase::do_clean, this)));
731 }
732
733
734 ~PriorityQueueBase() {
735 finishing = true;
736 }
737
738
739 // data_mtx must be held by caller
740 void do_add_request(RequestRef&& request,
741 const C& client_id,
742 const ReqParams& req_params,
743 const Time time,
744 const double cost = 0.0) {
745 ++tick;
746
747 // this pointer will help us create a reference to a shared
748 // pointer, no matter which of two codepaths we take
749 ClientRec* temp_client;
750
751 auto client_it = client_map.find(client_id);
752 if (client_map.end() != client_it) {
753 temp_client = &(*client_it->second); // address of obj of shared_ptr
754 } else {
755 ClientInfo info = client_info_f(client_id);
756 ClientRecRef client_rec =
757 std::make_shared<ClientRec>(client_id, info, tick);
758 resv_heap.push(client_rec);
759#if USE_PROP_HEAP
760 prop_heap.push(client_rec);
761#endif
762 limit_heap.push(client_rec);
763 ready_heap.push(client_rec);
764 client_map[client_id] = client_rec;
765 temp_client = &(*client_rec); // address of obj of shared_ptr
766 }
767
768 // for convenience, we'll create a reference to the shared pointer
769 ClientRec& client = *temp_client;
770
771 if (client.idle) {
772 // We need to do an adjustment so that idle clients compete
773 // fairly on proportional tags since those tags may have
774 // drifted from real-time. Either use the lowest existing
775 // proportion tag -- O(1) -- or the client with the lowest
776 // previous proportion tag -- O(n) where n = # clients.
777 //
778 // So we don't have to maintain a propotional queue that
779 // keeps the minimum on proportional tag alone (we're
780 // instead using a ready queue), we'll have to check each
781 // client.
782 //
783 // The alternative would be to maintain a proportional queue
784 // (define USE_PROP_TAG) and do an O(1) operation here.
785
786 // Was unable to confirm whether equality testing on
787 // std::numeric_limits<double>::max() is guaranteed, so
788 // we'll use a compile-time calculated trigger that is one
789 // third the max, which should be much larger than any
790 // expected organic value.
791 constexpr double lowest_prop_tag_trigger =
792 std::numeric_limits<double>::max() / 3.0;
793
794 double lowest_prop_tag = std::numeric_limits<double>::max();
795 for (auto const &c : client_map) {
796 // don't use ourselves (or anything else that might be
797 // listed as idle) since we're now in the map
798 if (!c.second->idle) {
799 double p;
800 // use either lowest proportion tag or previous proportion tag
801 if (c.second->has_request()) {
802 p = c.second->next_request().tag.proportion +
803 c.second->prop_delta;
804 } else {
805 p = c.second->get_req_tag().proportion + c.second->prop_delta;
806 }
807
808 if (p < lowest_prop_tag) {
809 lowest_prop_tag = p;
810 }
811 }
812 }
813
814 // if this conditional does not fire, it
815 if (lowest_prop_tag < lowest_prop_tag_trigger) {
816 client.prop_delta = lowest_prop_tag - time;
817 }
818 client.idle = false;
819 } // if this client was idle
820
821#ifndef DO_NOT_DELAY_TAG_CALC
822 RequestTag tag(0, 0, 0, time);
823
824 if (!client.has_request()) {
825 tag = RequestTag(client.get_req_tag(), client.info,
826 req_params, time, cost);
827
828 // copy tag to previous tag for client
829 client.update_req_tag(tag, tick);
830 }
831#else
832 RequestTag tag(client.get_req_tag(), client.info, req_params, time, cost);
833 // copy tag to previous tag for client
834 client.update_req_tag(tag, tick);
835#endif
836
837 client.add_request(tag, client.client, std::move(request));
838 if (1 == client.requests.size()) {
839 // NB: can the following 4 calls to adjust be changed
840 // promote? Can adding a request ever demote a client in the
841 // heaps?
842 resv_heap.adjust(client);
843 limit_heap.adjust(client);
844 ready_heap.adjust(client);
845#if USE_PROP_HEAP
846 prop_heap.adjust(client);
847#endif
848 }
849
850 client.cur_rho = req_params.rho;
851 client.cur_delta = req_params.delta;
852
853 resv_heap.adjust(client);
854 limit_heap.adjust(client);
855 ready_heap.adjust(client);
856#if USE_PROP_HEAP
857 prop_heap.adjust(client);
858#endif
859 } // add_request
860
861
862 // data_mtx should be held when called; top of heap should have
863 // a ready request
864 template<typename C1, IndIntruHeapData ClientRec::*C2, typename C3>
865 void pop_process_request(IndIntruHeap<C1, ClientRec, C2, C3, B>& heap,
866 std::function<void(const C& client,
867 RequestRef& request)> process) {
868 // gain access to data
869 ClientRec& top = heap.top();
870 ClientReq& first = top.next_request();
871 RequestRef request = std::move(first.request);
872
873 // pop request and adjust heaps
874 top.pop_request();
875
876#ifndef DO_NOT_DELAY_TAG_CALC
877 if (top.has_request()) {
878 ClientReq& next_first = top.next_request();
879 next_first.tag = RequestTag(first.tag, top.info,
880 ReqParams(top.cur_delta, top.cur_rho),
881 next_first.tag.arrival);
882
883 // copy tag to previous tag for client
884 top.update_req_tag(next_first.tag, tick);
885 }
886#endif
887
888 resv_heap.demote(top);
889 limit_heap.adjust(top);
890#if USE_PROP_HEAP
891 prop_heap.demote(top);
892#endif
893 ready_heap.demote(top);
894
895 // process
896 process(top.client, request);
897 } // pop_process_request
898
899
900 // data_mtx should be held when called
901 void reduce_reservation_tags(ClientRec& client) {
902 for (auto& r : client.requests) {
903 r.tag.reservation -= client.info.reservation_inv;
904
905#ifndef DO_NOT_DELAY_TAG_CALC
906 // reduce only for front tag. because next tags' value are invalid
907 break;
908#endif
909 }
910 // don't forget to update previous tag
911 client.prev_tag.reservation -= client.info.reservation_inv;
912 resv_heap.promote(client);
913 }
914
915
916 // data_mtx should be held when called
917 void reduce_reservation_tags(const C& client_id) {
918 auto client_it = client_map.find(client_id);
919
920 // means the client was cleaned from map; should never happen
921 // as long as cleaning times are long enough
922 assert(client_map.end() != client_it);
923 reduce_reservation_tags(*client_it->second);
924 }
925
926
927 // data_mtx should be held when called
928 NextReq do_next_request(Time now) {
929 NextReq result;
930
931 // if reservation queue is empty, all are empty (i.e., no active clients)
932 if(resv_heap.empty()) {
933 result.type = NextReqType::none;
934 return result;
935 }
936
937 // try constraint (reservation) based scheduling
938
939 auto& reserv = resv_heap.top();
940 if (reserv.has_request() &&
941 reserv.next_request().tag.reservation <= now) {
942 result.type = NextReqType::returning;
943 result.heap_id = HeapId::reservation;
944 return result;
945 }
946
947 // no existing reservations before now, so try weight-based
948 // scheduling
949
950 // all items that are within limit are eligible based on
951 // priority
952 auto limits = &limit_heap.top();
953 while (limits->has_request() &&
954 !limits->next_request().tag.ready &&
955 limits->next_request().tag.limit <= now) {
956 limits->next_request().tag.ready = true;
957 ready_heap.promote(*limits);
958 limit_heap.demote(*limits);
959
960 limits = &limit_heap.top();
961 }
962
963 auto& readys = ready_heap.top();
964 if (readys.has_request() &&
965 readys.next_request().tag.ready &&
966 readys.next_request().tag.proportion < max_tag) {
967 result.type = NextReqType::returning;
968 result.heap_id = HeapId::ready;
969 return result;
970 }
971
972 // if nothing is schedulable by reservation or
973 // proportion/weight, and if we allow limit break, try to
974 // schedule something with the lowest proportion tag or
975 // alternatively lowest reservation tag.
976 if (allow_limit_break) {
977 if (readys.has_request() &&
978 readys.next_request().tag.proportion < max_tag) {
979 result.type = NextReqType::returning;
980 result.heap_id = HeapId::ready;
981 return result;
982 } else if (reserv.has_request() &&
983 reserv.next_request().tag.reservation < max_tag) {
984 result.type = NextReqType::returning;
985 result.heap_id = HeapId::reservation;
986 return result;
987 }
988 }
989
990 // nothing scheduled; make sure we re-run when next
991 // reservation item or next limited item comes up
992
993 Time next_call = TimeMax;
994 if (resv_heap.top().has_request()) {
995 next_call =
996 min_not_0_time(next_call,
997 resv_heap.top().next_request().tag.reservation);
998 }
999 if (limit_heap.top().has_request()) {
1000 const auto& next = limit_heap.top().next_request();
1001 assert(!next.tag.ready || max_tag == next.tag.proportion);
1002 next_call = min_not_0_time(next_call, next.tag.limit);
1003 }
1004 if (next_call < TimeMax) {
1005 result.type = NextReqType::future;
1006 result.when_ready = next_call;
1007 return result;
1008 } else {
1009 result.type = NextReqType::none;
1010 return result;
1011 }
1012 } // do_next_request
1013
1014
1015 // if possible is not zero and less than current then return it;
1016 // otherwise return current; the idea is we're trying to find
1017 // the minimal time but ignoring zero
1018 static inline const Time& min_not_0_time(const Time& current,
1019 const Time& possible) {
1020 return TimeZero == possible ? current : std::min(current, possible);
1021 }
1022
1023
1024 /*
1025 * This is being called regularly by RunEvery. Every time it's
1026 * called it notes the time and delta counter (mark point) in a
1027 * deque. It also looks at the deque to find the most recent
1028 * mark point that is older than clean_age. It then walks the
1029 * map and delete all server entries that were last used before
1030 * that mark point.
1031 */
1032 void do_clean() {
1033 TimePoint now = std::chrono::steady_clock::now();
1034 DataGuard g(data_mtx);
1035 clean_mark_points.emplace_back(MarkPoint(now, tick));
1036
1037 // first erase the super-old client records
1038
1039 Counter erase_point = 0;
1040 auto point = clean_mark_points.front();
1041 while (point.first <= now - erase_age) {
1042 erase_point = point.second;
1043 clean_mark_points.pop_front();
1044 point = clean_mark_points.front();
1045 }
1046
1047 Counter idle_point = 0;
1048 for (auto i : clean_mark_points) {
1049 if (i.first <= now - idle_age) {
1050 idle_point = i.second;
1051 } else {
1052 break;
1053 }
1054 }
1055
1056 if (erase_point > 0 || idle_point > 0) {
1057 for (auto i = client_map.begin(); i != client_map.end(); /* empty */) {
1058 auto i2 = i++;
1059 if (erase_point && i2->second->last_tick <= erase_point) {
1060 delete_from_heaps(i2->second);
1061 client_map.erase(i2);
1062 } else if (idle_point && i2->second->last_tick <= idle_point) {
1063 i2->second->idle = true;
1064 }
1065 } // for
1066 } // if
1067 } // do_clean
1068
1069
1070 // data_mtx must be held by caller
1071 template<IndIntruHeapData ClientRec::*C1,typename C2>
1072 void delete_from_heap(ClientRecRef& client,
1073 c::IndIntruHeap<ClientRecRef,ClientRec,C1,C2,B>& heap) {
1074 auto i = heap.rfind(client);
1075 heap.remove(i);
1076 }
1077
1078
1079 // data_mtx must be held by caller
1080 void delete_from_heaps(ClientRecRef& client) {
1081 delete_from_heap(client, resv_heap);
1082#if USE_PROP_HEAP
1083 delete_from_heap(client, prop_heap);
1084#endif
1085 delete_from_heap(client, limit_heap);
1086 delete_from_heap(client, ready_heap);
1087 }
1088 }; // class PriorityQueueBase
1089
1090
1091 template<typename C, typename R, uint B=2>
1092 class PullPriorityQueue : public PriorityQueueBase<C,R,B> {
1093 using super = PriorityQueueBase<C,R,B>;
1094
1095 public:
1096
1097 // When a request is pulled, this is the return type.
1098 struct PullReq {
1099 struct Retn {
1100 C client;
1101 typename super::RequestRef request;
1102 PhaseType phase;
1103 };
1104
1105 typename super::NextReqType type;
1106 boost::variant<Retn,Time> data;
1107
1108 bool is_none() const { return type == super::NextReqType::none; }
1109
1110 bool is_retn() const { return type == super::NextReqType::returning; }
1111 Retn& get_retn() {
1112 return boost::get<Retn>(data);
1113 }
1114
1115 bool is_future() const { return type == super::NextReqType::future; }
1116 Time getTime() const { return boost::get<Time>(data); }
1117 };
1118
1119
1120#ifdef PROFILE
1121 ProfileTimer<std::chrono::nanoseconds> pull_request_timer;
1122 ProfileTimer<std::chrono::nanoseconds> add_request_timer;
1123#endif
1124
1125 template<typename Rep, typename Per>
1126 PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1127 std::chrono::duration<Rep,Per> _idle_age,
1128 std::chrono::duration<Rep,Per> _erase_age,
1129 std::chrono::duration<Rep,Per> _check_time,
1130 bool _allow_limit_break = false) :
1131 super(_client_info_f,
1132 _idle_age, _erase_age, _check_time,
1133 _allow_limit_break)
1134 {
1135 // empty
1136 }
1137
1138
1139 // pull convenience constructor
1140 PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1141 bool _allow_limit_break = false) :
1142 PullPriorityQueue(_client_info_f,
1143 std::chrono::minutes(10),
1144 std::chrono::minutes(15),
1145 std::chrono::minutes(6),
1146 _allow_limit_break)
1147 {
1148 // empty
1149 }
1150
1151
1152 inline void add_request(const R& request,
1153 const C& client_id,
1154 const ReqParams& req_params,
1155 double addl_cost = 0.0) {
1156 add_request(typename super::RequestRef(new R(request)),
1157 client_id,
1158 req_params,
1159 get_time(),
1160 addl_cost);
1161 }
1162
1163
1164 inline void add_request(const R& request,
1165 const C& client_id,
1166 double addl_cost = 0.0) {
1167 static const ReqParams null_req_params;
1168 add_request(typename super::RequestRef(new R(request)),
1169 client_id,
1170 null_req_params,
1171 get_time(),
1172 addl_cost);
1173 }
1174
1175
1176
1177 inline void add_request_time(const R& request,
1178 const C& client_id,
1179 const ReqParams& req_params,
1180 const Time time,
1181 double addl_cost = 0.0) {
1182 add_request(typename super::RequestRef(new R(request)),
1183 client_id,
1184 req_params,
1185 time,
1186 addl_cost);
1187 }
1188
1189
1190 inline void add_request(typename super::RequestRef&& request,
1191 const C& client_id,
1192 const ReqParams& req_params,
1193 double addl_cost = 0.0) {
1194 add_request(request, req_params, client_id, get_time(), addl_cost);
1195 }
1196
1197
1198 inline void add_request(typename super::RequestRef&& request,
1199 const C& client_id,
1200 double addl_cost = 0.0) {
1201 static const ReqParams null_req_params;
1202 add_request(request, null_req_params, client_id, get_time(), addl_cost);
1203 }
1204
1205
1206 // this does the work; the versions above provide alternate interfaces
1207 void add_request(typename super::RequestRef&& request,
1208 const C& client_id,
1209 const ReqParams& req_params,
1210 const Time time,
1211 double addl_cost = 0.0) {
1212 typename super::DataGuard g(this->data_mtx);
1213#ifdef PROFILE
1214 add_request_timer.start();
1215#endif
1216 super::do_add_request(std::move(request),
1217 client_id,
1218 req_params,
1219 time,
1220 addl_cost);
1221 // no call to schedule_request for pull version
1222#ifdef PROFILE
1223 add_request_timer.stop();
1224#endif
1225 }
1226
1227
1228 inline PullReq pull_request() {
1229 return pull_request(get_time());
1230 }
1231
1232
1233 PullReq pull_request(Time now) {
1234 PullReq result;
1235 typename super::DataGuard g(this->data_mtx);
1236#ifdef PROFILE
1237 pull_request_timer.start();
1238#endif
1239
1240 typename super::NextReq next = super::do_next_request(now);
1241 result.type = next.type;
1242 switch(next.type) {
1243 case super::NextReqType::none:
1244 return result;
1245 break;
1246 case super::NextReqType::future:
1247 result.data = next.when_ready;
1248 return result;
1249 break;
1250 case super::NextReqType::returning:
1251 // to avoid nesting, break out and let code below handle this case
1252 break;
1253 default:
1254 assert(false);
1255 }
1256
1257 // we'll only get here if we're returning an entry
1258
1259 auto process_f =
1260 [&] (PullReq& pull_result, PhaseType phase) ->
1261 std::function<void(const C&,
1262 typename super::RequestRef&)> {
1263 return [&pull_result, phase](const C& client,
1264 typename super::RequestRef& request) {
1265 pull_result.data =
1266 typename PullReq::Retn{client, std::move(request), phase};
1267 };
1268 };
1269
1270 switch(next.heap_id) {
1271 case super::HeapId::reservation:
1272 super::pop_process_request(this->resv_heap,
1273 process_f(result, PhaseType::reservation));
1274 ++this->reserv_sched_count;
1275 break;
1276 case super::HeapId::ready:
1277 super::pop_process_request(this->ready_heap,
1278 process_f(result, PhaseType::priority));
1279 { // need to use retn temporarily
1280 auto& retn = boost::get<typename PullReq::Retn>(result.data);
1281 super::reduce_reservation_tags(retn.client);
1282 }
1283 ++this->prop_sched_count;
1284 break;
1285 default:
1286 assert(false);
1287 }
1288
1289#ifdef PROFILE
1290 pull_request_timer.stop();
1291#endif
1292 return result;
1293 } // pull_request
1294
1295
1296 protected:
1297
1298
1299 // data_mtx should be held when called; unfortunately this
1300 // function has to be repeated in both push & pull
1301 // specializations
1302 typename super::NextReq next_request() {
1303 return next_request(get_time());
1304 }
1305 }; // class PullPriorityQueue
1306
1307
1308 // PUSH version
1309 template<typename C, typename R, uint B=2>
1310 class PushPriorityQueue : public PriorityQueueBase<C,R,B> {
1311
1312 protected:
1313
1314 using super = PriorityQueueBase<C,R,B>;
1315
1316 public:
1317
1318 // a function to see whether the server can handle another request
1319 using CanHandleRequestFunc = std::function<bool(void)>;
1320
1321 // a function to submit a request to the server; the second
1322 // parameter is a callback when it's completed
1323 using HandleRequestFunc =
1324 std::function<void(const C&,typename super::RequestRef,PhaseType)>;
1325
1326 protected:
1327
1328 CanHandleRequestFunc can_handle_f;
1329 HandleRequestFunc handle_f;
1330 // for handling timed scheduling
1331 std::mutex sched_ahead_mtx;
1332 std::condition_variable sched_ahead_cv;
1333 Time sched_ahead_when = TimeZero;
1334
1335#ifdef PROFILE
1336 public:
1337 ProfileTimer<std::chrono::nanoseconds> add_request_timer;
1338 ProfileTimer<std::chrono::nanoseconds> request_complete_timer;
1339 protected:
1340#endif
1341
1342 // NB: threads declared last, so constructed last and destructed first
1343
1344 std::thread sched_ahead_thd;
1345
1346 public:
1347
1348 // push full constructor
1349 template<typename Rep, typename Per>
1350 PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1351 CanHandleRequestFunc _can_handle_f,
1352 HandleRequestFunc _handle_f,
1353 std::chrono::duration<Rep,Per> _idle_age,
1354 std::chrono::duration<Rep,Per> _erase_age,
1355 std::chrono::duration<Rep,Per> _check_time,
1356 bool _allow_limit_break = false) :
1357 super(_client_info_f,
1358 _idle_age, _erase_age, _check_time,
1359 _allow_limit_break)
1360 {
1361 can_handle_f = _can_handle_f;
1362 handle_f = _handle_f;
1363 sched_ahead_thd = std::thread(&PushPriorityQueue::run_sched_ahead, this);
1364 }
1365
1366
1367 // push convenience constructor
1368 PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1369 CanHandleRequestFunc _can_handle_f,
1370 HandleRequestFunc _handle_f,
1371 bool _allow_limit_break = false) :
1372 PushPriorityQueue(_client_info_f,
1373 _can_handle_f,
1374 _handle_f,
1375 std::chrono::minutes(10),
1376 std::chrono::minutes(15),
1377 std::chrono::minutes(6),
1378 _allow_limit_break)
1379 {
1380 // empty
1381 }
1382
1383
1384 ~PushPriorityQueue() {
1385 this->finishing = true;
1386 sched_ahead_cv.notify_one();
1387 sched_ahead_thd.join();
1388 }
1389
1390 public:
1391
1392 inline void add_request(const R& request,
1393 const C& client_id,
1394 const ReqParams& req_params,
1395 double addl_cost = 0.0) {
1396 add_request(typename super::RequestRef(new R(request)),
1397 client_id,
1398 req_params,
1399 get_time(),
1400 addl_cost);
1401 }
1402
1403
1404 inline void add_request(typename super::RequestRef&& request,
1405 const C& client_id,
1406 const ReqParams& req_params,
1407 double addl_cost = 0.0) {
1408 add_request(request, req_params, client_id, get_time(), addl_cost);
1409 }
1410
1411
1412 inline void add_request_time(const R& request,
1413 const C& client_id,
1414 const ReqParams& req_params,
1415 const Time time,
1416 double addl_cost = 0.0) {
1417 add_request(typename super::RequestRef(new R(request)),
1418 client_id,
1419 req_params,
1420 time,
1421 addl_cost);
1422 }
1423
1424
1425 void add_request(typename super::RequestRef&& request,
1426 const C& client_id,
1427 const ReqParams& req_params,
1428 const Time time,
1429 double addl_cost = 0.0) {
1430 typename super::DataGuard g(this->data_mtx);
1431#ifdef PROFILE
1432 add_request_timer.start();
1433#endif
1434 super::do_add_request(std::move(request),
1435 client_id,
1436 req_params,
1437 time,
1438 addl_cost);
1439 schedule_request();
1440#ifdef PROFILE
1441 add_request_timer.stop();
1442#endif
1443 }
1444
1445
1446 void request_completed() {
1447 typename super::DataGuard g(this->data_mtx);
1448#ifdef PROFILE
1449 request_complete_timer.start();
1450#endif
1451 schedule_request();
1452#ifdef PROFILE
1453 request_complete_timer.stop();
1454#endif
1455 }
1456
1457 protected:
1458
1459 // data_mtx should be held when called; furthermore, the heap
1460 // should not be empty and the top element of the heap should
1461 // not be already handled
1462 //
1463 // NOTE: the use of "super::ClientRec" in either the template
1464 // construct or as a parameter to submit_top_request generated
1465 // a compiler error in g++ 4.8.4, when ClientRec was
1466 // "protected" rather than "public". By g++ 6.3.1 this was not
1467 // an issue. But for backwards compatibility
1468 // PriorityQueueBase::ClientRec is public.
1469 template<typename C1,
1470 IndIntruHeapData super::ClientRec::*C2,
1471 typename C3,
1472 uint B4>
1473 C submit_top_request(IndIntruHeap<C1,typename super::ClientRec,C2,C3,B4>& heap,
1474 PhaseType phase) {
1475 C client_result;
1476 super::pop_process_request(heap,
1477 [this, phase, &client_result]
1478 (const C& client,
1479 typename super::RequestRef& request) {
1480 client_result = client;
1481 handle_f(client, std::move(request), phase);
1482 });
1483 return client_result;
1484 }
1485
1486
1487 // data_mtx should be held when called
1488 void submit_request(typename super::HeapId heap_id) {
1489 C client;
1490 switch(heap_id) {
1491 case super::HeapId::reservation:
1492 // don't need to note client
1493 (void) submit_top_request(this->resv_heap, PhaseType::reservation);
1494 // unlike the other two cases, we do not reduce reservation
1495 // tags here
1496 ++this->reserv_sched_count;
1497 break;
1498 case super::HeapId::ready:
1499 client = submit_top_request(this->ready_heap, PhaseType::priority);
1500 super::reduce_reservation_tags(client);
1501 ++this->prop_sched_count;
1502 break;
1503 default:
1504 assert(false);
1505 }
1506 } // submit_request
1507
1508
1509 // data_mtx should be held when called; unfortunately this
1510 // function has to be repeated in both push & pull
1511 // specializations
1512 typename super::NextReq next_request() {
1513 return next_request(get_time());
1514 }
1515
1516
1517 // data_mtx should be held when called; overrides member
1518 // function in base class to add check for whether a request can
1519 // be pushed to the server
1520 typename super::NextReq next_request(Time now) {
1521 if (!can_handle_f()) {
1522 typename super::NextReq result;
1523 result.type = super::NextReqType::none;
1524 return result;
1525 } else {
1526 return super::do_next_request(now);
1527 }
1528 } // next_request
1529
1530
1531 // data_mtx should be held when called
1532 void schedule_request() {
1533 typename super::NextReq next_req = next_request();
1534 switch (next_req.type) {
1535 case super::NextReqType::none:
1536 return;
1537 case super::NextReqType::future:
1538 sched_at(next_req.when_ready);
1539 break;
1540 case super::NextReqType::returning:
1541 submit_request(next_req.heap_id);
1542 break;
1543 default:
1544 assert(false);
1545 }
1546 }
1547
1548
1549 // this is the thread that handles running schedule_request at
1550 // future times when nothing can be scheduled immediately
1551 void run_sched_ahead() {
1552 std::unique_lock<std::mutex> l(sched_ahead_mtx);
1553
1554 while (!this->finishing) {
1555 if (TimeZero == sched_ahead_when) {
1556 sched_ahead_cv.wait(l);
1557 } else {
1558 Time now;
1559 while (!this->finishing && (now = get_time()) < sched_ahead_when) {
1560 long microseconds_l = long(1 + 1000000 * (sched_ahead_when - now));
1561 auto microseconds = std::chrono::microseconds(microseconds_l);
1562 sched_ahead_cv.wait_for(l, microseconds);
1563 }
1564 sched_ahead_when = TimeZero;
1565 if (this->finishing) return;
1566
1567 l.unlock();
1568 if (!this->finishing) {
1569 typename super::DataGuard g(this->data_mtx);
1570 schedule_request();
1571 }
1572 l.lock();
1573 }
1574 }
1575 }
1576
1577
1578 void sched_at(Time when) {
1579 std::lock_guard<std::mutex> l(sched_ahead_mtx);
1580 if (TimeZero == sched_ahead_when || when < sched_ahead_when) {
1581 sched_ahead_when = when;
1582 sched_ahead_cv.notify_one();
1583 }
1584 }
1585 }; // class PushPriorityQueue
1586
1587 } // namespace dmclock
1588} // namespace crimson