]> git.proxmox.com Git - ceph.git/blame - ceph/src/dmclock/src/dmclock_server.h
buildsys: switch source download to quincy
[ceph.git] / ceph / src / dmclock / src / dmclock_server.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4/*
5 * Copyright (C) 2017 Red Hat Inc.
11fdf7f2
TL
6 *
7 * Author: J. Eric Ivancich <ivancich@redhat.com>
8 *
9 * This is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU Lesser General Public License version
11 * 2.1, as published by the Free Software Foundation. See file
12 * COPYING.
7c673cae
FG
13 */
14
15
16#pragma once
17
18/* COMPILATION OPTIONS
7c673cae
FG
19 *
20 * The prop_heap does not seem to be necessary. The only thing it
11fdf7f2 21 * would help with is quickly finding the minimum proportion/prioity
7c673cae
FG
22 * when an idle client became active. To have the code maintain the
23 * proportional heap, define USE_PROP_HEAP (i.e., compiler argument
24 * -DUSE_PROP_HEAP).
25 */
26
27#include <assert.h>
28
29#include <cmath>
30#include <memory>
31#include <map>
32#include <deque>
33#include <queue>
34#include <atomic>
35#include <mutex>
36#include <condition_variable>
37#include <thread>
38#include <iostream>
39#include <sstream>
40#include <limits>
41
42#include <boost/variant.hpp>
43
44#include "indirect_intrusive_heap.h"
9f95a23c 45#include "../support/src/run_every.h"
7c673cae
FG
46#include "dmclock_util.h"
47#include "dmclock_recs.h"
48
49#ifdef PROFILE
50#include "profile.h"
51#endif
52
7c673cae
FG
53
54namespace crimson {
55
56 namespace dmclock {
57
58 namespace c = crimson;
59
60 constexpr double max_tag = std::numeric_limits<double>::is_iec559 ?
61 std::numeric_limits<double>::infinity() :
62 std::numeric_limits<double>::max();
63 constexpr double min_tag = std::numeric_limits<double>::is_iec559 ?
64 -std::numeric_limits<double>::infinity() :
65 std::numeric_limits<double>::lowest();
9f95a23c 66 constexpr unsigned tag_modulo = 1000000;
7c673cae 67
11fdf7f2
TL
68 constexpr auto standard_idle_age = std::chrono::seconds(300);
69 constexpr auto standard_erase_age = std::chrono::seconds(600);
70 constexpr auto standard_check_time = std::chrono::seconds(60);
71 constexpr auto aggressive_check_time = std::chrono::seconds(5);
9f95a23c 72 constexpr unsigned standard_erase_max = 2000;
11fdf7f2
TL
73
74 enum class AtLimit {
75 // requests are delayed until the limit is restored
76 Wait,
77 // requests are allowed to exceed their limit, if all other reservations
78 // are met and below their limits
79 Allow,
80 // if an incoming request would exceed its limit, add_request() will
81 // reject it with EAGAIN instead of adding it to the queue. cannot be used
82 // with DelayedTagCalc, because add_request() needs an accurate tag
83 Reject,
84 };
85
86 // when AtLimit::Reject is used, only start rejecting requests once their
87 // limit is above this threshold. requests under this threshold are
88 // enqueued and processed like AtLimit::Wait
89 using RejectThreshold = Time;
90
91 // the AtLimit constructor parameter can either accept AtLimit or a value
92 // for RejectThreshold (which implies AtLimit::Reject)
93 using AtLimitParam = boost::variant<AtLimit, RejectThreshold>;
94
7c673cae 95 struct ClientInfo {
11fdf7f2
TL
96 double reservation; // minimum
97 double weight; // proportional
98 double limit; // maximum
7c673cae
FG
99
100 // multiplicative inverses of above, which we use in calculations
101 // and don't want to recalculate repeatedly
11fdf7f2
TL
102 double reservation_inv;
103 double weight_inv;
104 double limit_inv;
7c673cae
FG
105
106 // order parameters -- min, "normal", max
9f95a23c
TL
107 ClientInfo(double _reservation, double _weight, double _limit) {
108 update(_reservation, _weight, _limit);
109 }
110
111 inline void update(double _reservation, double _weight, double _limit) {
112 reservation = _reservation;
113 weight = _weight;
114 limit = _limit;
115 reservation_inv = (0.0 == reservation) ? 0.0 : 1.0 / reservation;
116 weight_inv = (0.0 == weight) ? 0.0 : 1.0 / weight;
117 limit_inv = (0.0 == limit) ? 0.0 : 1.0 / limit;
7c673cae 118 }
7c673cae
FG
119
120 friend std::ostream& operator<<(std::ostream& out,
121 const ClientInfo& client) {
122 out <<
123 "{ ClientInfo:: r:" << client.reservation <<
124 " w:" << std::fixed << client.weight <<
125 " l:" << std::fixed << client.limit <<
126 " 1/r:" << std::fixed << client.reservation_inv <<
127 " 1/w:" << std::fixed << client.weight_inv <<
128 " 1/l:" << std::fixed << client.limit_inv <<
129 " }";
130 return out;
131 }
132 }; // class ClientInfo
133
134
135 struct RequestTag {
11fdf7f2
TL
136 double reservation;
137 double proportion;
138 double limit;
139 uint32_t delta;
140 uint32_t rho;
141 Cost cost;
142 bool ready; // true when within limit
143 Time arrival;
7c673cae
FG
144
145 RequestTag(const RequestTag& prev_tag,
146 const ClientInfo& client,
11fdf7f2
TL
147 const uint32_t _delta,
148 const uint32_t _rho,
31f18b77 149 const Time time,
11fdf7f2
TL
150 const Cost _cost = 1u,
151 const double anticipation_timeout = 0.0) :
152 delta(_delta),
153 rho(_rho),
154 cost(_cost),
155 ready(false),
156 arrival(time)
7c673cae 157 {
11fdf7f2
TL
158 assert(cost > 0);
159 Time max_time = time;
160 if (time - anticipation_timeout < prev_tag.arrival)
161 max_time -= anticipation_timeout;
162
163 reservation = tag_calc(max_time,
164 prev_tag.reservation,
165 client.reservation_inv,
166 rho,
167 true,
168 cost);
169 proportion = tag_calc(max_time,
170 prev_tag.proportion,
171 client.weight_inv,
172 delta,
173 true,
174 cost);
175 limit = tag_calc(max_time,
176 prev_tag.limit,
177 client.limit_inv,
178 delta,
179 false,
180 cost);
181
7c673cae
FG
182 assert(reservation < max_tag || proportion < max_tag);
183 }
184
31f18b77
FG
185 RequestTag(const RequestTag& prev_tag,
186 const ClientInfo& client,
187 const ReqParams req_params,
188 const Time time,
11fdf7f2
TL
189 const Cost cost = 1u,
190 const double anticipation_timeout = 0.0) :
191 RequestTag(prev_tag, client, req_params.delta, req_params.rho, time,
192 cost, anticipation_timeout)
31f18b77
FG
193 { /* empty */ }
194
11fdf7f2
TL
195 RequestTag(const double _res, const double _prop, const double _lim,
196 const Time _arrival,
197 const uint32_t _delta = 0,
198 const uint32_t _rho = 0,
199 const Cost _cost = 1u) :
7c673cae
FG
200 reservation(_res),
201 proportion(_prop),
202 limit(_lim),
11fdf7f2
TL
203 delta(_delta),
204 rho(_rho),
205 cost(_cost),
206 ready(false),
207 arrival(_arrival)
7c673cae 208 {
11fdf7f2 209 assert(cost > 0);
7c673cae
FG
210 assert(reservation < max_tag || proportion < max_tag);
211 }
212
213 RequestTag(const RequestTag& other) :
214 reservation(other.reservation),
215 proportion(other.proportion),
216 limit(other.limit),
11fdf7f2
TL
217 delta(other.delta),
218 rho(other.rho),
219 cost(other.cost),
220 ready(other.ready),
221 arrival(other.arrival)
222 { /* empty */ }
7c673cae
FG
223
224 static std::string format_tag_change(double before, double after) {
225 if (before == after) {
226 return std::string("same");
227 } else {
228 std::stringstream ss;
229 ss << format_tag(before) << "=>" << format_tag(after);
230 return ss.str();
231 }
232 }
233
234 static std::string format_tag(double value) {
235 if (max_tag == value) {
236 return std::string("max");
237 } else if (min_tag == value) {
238 return std::string("min");
239 } else {
240 return format_time(value, tag_modulo);
241 }
242 }
243
244 private:
245
31f18b77 246 static double tag_calc(const Time time,
11fdf7f2
TL
247 const double prev,
248 const double increment,
249 const uint32_t dist_req_val,
250 const bool extreme_is_high,
251 const Cost cost) {
7c673cae
FG
252 if (0.0 == increment) {
253 return extreme_is_high ? max_tag : min_tag;
254 } else {
11fdf7f2
TL
255 // insure 64-bit arithmetic before conversion to double
256 double tag_increment = increment * (uint64_t(dist_req_val) + cost);
257 return std::max(time, prev + tag_increment);
7c673cae
FG
258 }
259 }
260
261 friend std::ostream& operator<<(std::ostream& out,
262 const RequestTag& tag) {
263 out <<
264 "{ RequestTag:: ready:" << (tag.ready ? "true" : "false") <<
265 " r:" << format_tag(tag.reservation) <<
266 " p:" << format_tag(tag.proportion) <<
267 " l:" << format_tag(tag.limit) <<
268#if 0 // try to resolve this to make sure Time is operator<<'able.
7c673cae 269 " arrival:" << tag.arrival <<
7c673cae
FG
270#endif
271 " }";
272 return out;
273 }
274 }; // class RequestTag
275
11fdf7f2
TL
276 // C is client identifier type, R is request type,
277 // IsDelayed controls whether tag calculation is delayed until the request
278 // reaches the front of its queue. This is an optimization over the
279 // originally published dmclock algorithm, allowing it to use the most
280 // recent values of rho and delta.
281 // U1 determines whether to use client information function dynamically,
282 // B is heap branching factor
9f95a23c 283 template<typename C, typename R, bool IsDelayed, bool U1, unsigned B>
7c673cae 284 class PriorityQueueBase {
d2e6a577
FG
285 // we don't want to include gtest.h just for FRIEND_TEST
286 friend class dmclock_server_client_idle_erase_Test;
7c673cae 287
11fdf7f2
TL
288 // types used for tag dispatch to select between implementations
289 using TagCalc = std::integral_constant<bool, IsDelayed>;
290 using DelayedTagCalc = std::true_type;
291 using ImmediateTagCalc = std::false_type;
292
7c673cae
FG
293 public:
294
295 using RequestRef = std::unique_ptr<R>;
296
297 protected:
298
f67539c2
TL
299 using Clock = std::chrono::steady_clock;
300 using TimePoint = Clock::time_point;
7c673cae
FG
301 using Duration = std::chrono::milliseconds;
302 using MarkPoint = std::pair<TimePoint,Counter>;
303
304 enum class ReadyOption {ignore, lowers, raises};
305
306 // forward decl for friend decls
307 template<double RequestTag::*, ReadyOption, bool>
308 struct ClientCompare;
309
310 class ClientReq {
311 friend PriorityQueueBase;
312
313 RequestTag tag;
314 C client_id;
315 RequestRef request;
316
317 public:
318
319 ClientReq(const RequestTag& _tag,
320 const C& _client_id,
321 RequestRef&& _request) :
322 tag(_tag),
323 client_id(_client_id),
324 request(std::move(_request))
325 {
326 // empty
327 }
328
329 friend std::ostream& operator<<(std::ostream& out, const ClientReq& c) {
330 out << "{ ClientReq:: tag:" << c.tag << " client:" <<
331 c.client_id << " }";
332 return out;
333 }
334 }; // class ClientReq
335
11fdf7f2
TL
336 struct RequestMeta {
337 C client_id;
338 RequestTag tag;
339
340 RequestMeta(const C& _client_id, const RequestTag& _tag) :
341 client_id(_client_id),
342 tag(_tag)
343 {
344 // empty
345 }
346 };
347
7c673cae
FG
348 public:
349
350 // NOTE: ClientRec is in the "public" section for compatibility
351 // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1
352 // ClientRec could be "protected" with no issue. [See comments
353 // associated with function submit_top_request.]
354 class ClientRec {
11fdf7f2 355 friend PriorityQueueBase<C,R,IsDelayed,U1,B>;
7c673cae
FG
356
357 C client;
358 RequestTag prev_tag;
359 std::deque<ClientReq> requests;
360
361 // amount added from the proportion tag as a result of
362 // an idle client becoming unidle
363 double prop_delta = 0.0;
364
11fdf7f2
TL
365 c::IndIntruHeapData reserv_heap_data {};
366 c::IndIntruHeapData lim_heap_data {};
367 c::IndIntruHeapData ready_heap_data {};
7c673cae 368#if USE_PROP_HEAP
11fdf7f2 369 c::IndIntruHeapData prop_heap_data {};
7c673cae
FG
370#endif
371
372 public:
373
11fdf7f2 374 const ClientInfo* info;
7c673cae
FG
375 bool idle;
376 Counter last_tick;
377 uint32_t cur_rho;
378 uint32_t cur_delta;
379
380 ClientRec(C _client,
11fdf7f2 381 const ClientInfo* _info,
7c673cae
FG
382 Counter current_tick) :
383 client(_client),
384 prev_tag(0.0, 0.0, 0.0, TimeZero),
385 info(_info),
386 idle(true),
387 last_tick(current_tick),
388 cur_rho(1),
389 cur_delta(1)
390 {
391 // empty
392 }
393
394 inline const RequestTag& get_req_tag() const {
395 return prev_tag;
396 }
397
398 static inline void assign_unpinned_tag(double& lhs, const double rhs) {
399 if (rhs != max_tag && rhs != min_tag) {
400 lhs = rhs;
401 }
402 }
403
404 inline void update_req_tag(const RequestTag& _prev,
405 const Counter& _tick) {
406 assign_unpinned_tag(prev_tag.reservation, _prev.reservation);
407 assign_unpinned_tag(prev_tag.limit, _prev.limit);
408 assign_unpinned_tag(prev_tag.proportion, _prev.proportion);
11fdf7f2 409 prev_tag.arrival = _prev.arrival;
7c673cae
FG
410 last_tick = _tick;
411 }
412
11fdf7f2
TL
413 inline void add_request(const RequestTag& tag, RequestRef&& request) {
414 requests.emplace_back(tag, client, std::move(request));
7c673cae
FG
415 }
416
417 inline const ClientReq& next_request() const {
418 return requests.front();
419 }
420
421 inline ClientReq& next_request() {
422 return requests.front();
423 }
424
425 inline void pop_request() {
426 requests.pop_front();
427 }
428
429 inline bool has_request() const {
430 return !requests.empty();
431 }
432
433 inline size_t request_count() const {
434 return requests.size();
435 }
436
437 // NB: because a deque is the underlying structure, this
438 // operation might be expensive
11fdf7f2 439 bool remove_by_req_filter_fw(std::function<bool(RequestRef&&)> filter_accum) {
7c673cae
FG
440 bool any_removed = false;
441 for (auto i = requests.begin();
442 i != requests.end();
443 /* no inc */) {
11fdf7f2 444 if (filter_accum(std::move(i->request))) {
7c673cae
FG
445 any_removed = true;
446 i = requests.erase(i);
447 } else {
448 ++i;
449 }
450 }
451 return any_removed;
452 }
453
454 // NB: because a deque is the underlying structure, this
455 // operation might be expensive
11fdf7f2 456 bool remove_by_req_filter_bw(std::function<bool(RequestRef&&)> filter_accum) {
7c673cae
FG
457 bool any_removed = false;
458 for (auto i = requests.rbegin();
459 i != requests.rend();
460 /* no inc */) {
11fdf7f2 461 if (filter_accum(std::move(i->request))) {
7c673cae
FG
462 any_removed = true;
463 i = decltype(i){ requests.erase(std::next(i).base()) };
464 } else {
465 ++i;
466 }
467 }
468 return any_removed;
469 }
470
471 inline bool
11fdf7f2 472 remove_by_req_filter(std::function<bool(RequestRef&&)> filter_accum,
7c673cae
FG
473 bool visit_backwards) {
474 if (visit_backwards) {
475 return remove_by_req_filter_bw(filter_accum);
476 } else {
477 return remove_by_req_filter_fw(filter_accum);
478 }
479 }
480
481 friend std::ostream&
482 operator<<(std::ostream& out,
11fdf7f2 483 const typename PriorityQueueBase::ClientRec& e) {
7c673cae
FG
484 out << "{ ClientRec::" <<
485 " client:" << e.client <<
486 " prev_tag:" << e.prev_tag <<
487 " req_count:" << e.requests.size() <<
488 " top_req:";
489 if (e.has_request()) {
490 out << e.next_request();
491 } else {
492 out << "none";
493 }
494 out << " }";
495
496 return out;
497 }
498 }; // class ClientRec
499
500 using ClientRecRef = std::shared_ptr<ClientRec>;
501
502 // when we try to get the next request, we'll be in one of three
503 // situations -- we'll have one to return, have one that can
504 // fire in the future, or not have any
505 enum class NextReqType { returning, future, none };
506
507 // specifies which queue next request will get popped from
508 enum class HeapId { reservation, ready };
509
510 // this is returned from next_req to tell the caller the situation
511 struct NextReq {
512 NextReqType type;
513 union {
514 HeapId heap_id;
515 Time when_ready;
516 };
11fdf7f2
TL
517
518 inline explicit NextReq() :
519 type(NextReqType::none)
520 { }
521
522 inline NextReq(HeapId _heap_id) :
523 type(NextReqType::returning),
524 heap_id(_heap_id)
525 { }
526
527 inline NextReq(Time _when_ready) :
528 type(NextReqType::future),
529 when_ready(_when_ready)
530 { }
531
532 // calls to this are clearer than calls to the default
533 // constructor
534 static inline NextReq none() {
535 return NextReq();
536 }
7c673cae
FG
537 };
538
539
540 // a function that can be called to look up client information
11fdf7f2 541 using ClientInfoFunc = std::function<const ClientInfo*(const C&)>;
7c673cae
FG
542
543
544 bool empty() const {
545 DataGuard g(data_mtx);
546 return (resv_heap.empty() || ! resv_heap.top().has_request());
547 }
548
549
550 size_t client_count() const {
551 DataGuard g(data_mtx);
552 return resv_heap.size();
553 }
554
555
556 size_t request_count() const {
557 DataGuard g(data_mtx);
558 size_t total = 0;
559 for (auto i = resv_heap.cbegin(); i != resv_heap.cend(); ++i) {
560 total += i->request_count();
561 }
562 return total;
563 }
564
565
11fdf7f2 566 bool remove_by_req_filter(std::function<bool(RequestRef&&)> filter_accum,
7c673cae
FG
567 bool visit_backwards = false) {
568 bool any_removed = false;
569 DataGuard g(data_mtx);
570 for (auto i : client_map) {
571 bool modified =
572 i.second->remove_by_req_filter(filter_accum, visit_backwards);
573 if (modified) {
574 resv_heap.adjust(*i.second);
575 limit_heap.adjust(*i.second);
576 ready_heap.adjust(*i.second);
577#if USE_PROP_HEAP
578 prop_heap.adjust(*i.second);
579#endif
580 any_removed = true;
581 }
582 }
583 return any_removed;
584 }
585
586
587 // use as a default value when no accumulator is provide
11fdf7f2 588 static void request_sink(RequestRef&& req) {
7c673cae
FG
589 // do nothing
590 }
591
592
593 void remove_by_client(const C& client,
594 bool reverse = false,
11fdf7f2 595 std::function<void (RequestRef&&)> accum = request_sink) {
7c673cae
FG
596 DataGuard g(data_mtx);
597
598 auto i = client_map.find(client);
599
600 if (i == client_map.end()) return;
601
602 if (reverse) {
603 for (auto j = i->second->requests.rbegin();
604 j != i->second->requests.rend();
605 ++j) {
11fdf7f2 606 accum(std::move(j->request));
7c673cae
FG
607 }
608 } else {
609 for (auto j = i->second->requests.begin();
610 j != i->second->requests.end();
611 ++j) {
11fdf7f2 612 accum(std::move(j->request));
7c673cae
FG
613 }
614 }
615
616 i->second->requests.clear();
617
618 resv_heap.adjust(*i->second);
619 limit_heap.adjust(*i->second);
620 ready_heap.adjust(*i->second);
621#if USE_PROP_HEAP
622 prop_heap.adjust(*i->second);
623#endif
624 }
625
626
9f95a23c 627 unsigned get_heap_branching_factor() const {
7c673cae
FG
628 return B;
629 }
630
631
11fdf7f2
TL
632 void update_client_info(const C& client_id) {
633 DataGuard g(data_mtx);
634 auto client_it = client_map.find(client_id);
635 if (client_map.end() != client_it) {
636 ClientRec& client = (*client_it->second);
637 client.info = client_info_f(client_id);
638 }
639 }
640
641
642 void update_client_infos() {
643 DataGuard g(data_mtx);
644 for (auto i : client_map) {
645 i.second->info = client_info_f(i.second->client);
646 }
647 }
648
649
7c673cae
FG
650 friend std::ostream& operator<<(std::ostream& out,
651 const PriorityQueueBase& q) {
652 std::lock_guard<decltype(q.data_mtx)> guard(q.data_mtx);
653
654 out << "{ PriorityQueue::";
655 for (const auto& c : q.client_map) {
656 out << " { client:" << c.first << ", record:" << *c.second <<
657 " }";
658 }
659 if (!q.resv_heap.empty()) {
660 const auto& resv = q.resv_heap.top();
661 out << " { reservation_top:" << resv << " }";
662 const auto& ready = q.ready_heap.top();
663 out << " { ready_top:" << ready << " }";
664 const auto& limit = q.limit_heap.top();
665 out << " { limit_top:" << limit << " }";
666 } else {
667 out << " HEAPS-EMPTY";
668 }
669 out << " }";
670
671 return out;
672 }
673
674 // for debugging
675 void display_queues(std::ostream& out,
676 bool show_res = true,
677 bool show_lim = true,
678 bool show_ready = true,
679 bool show_prop = true) const {
680 auto filter = [](const ClientRec& e)->bool { return true; };
681 DataGuard g(data_mtx);
682 if (show_res) {
683 resv_heap.display_sorted(out << "RESER:", filter);
684 }
685 if (show_lim) {
686 limit_heap.display_sorted(out << "LIMIT:", filter);
687 }
688 if (show_ready) {
689 ready_heap.display_sorted(out << "READY:", filter);
690 }
691#if USE_PROP_HEAP
692 if (show_prop) {
693 prop_heap.display_sorted(out << "PROPO:", filter);
694 }
695#endif
696 } // display_queues
697
698
699 protected:
700
701 // The ClientCompare functor is essentially doing a precedes?
702 // operator, returning true if and only if the first parameter
703 // must precede the second parameter. If the second must precede
704 // the first, or if they are equivalent, false should be
705 // returned. The reason for this behavior is that it will be
706 // called to test if two items are out of order and if true is
707 // returned it will reverse the items. Therefore false is the
708 // default return when it doesn't matter to prevent unnecessary
709 // re-ordering.
710 //
711 // The template is supporting variations in sorting based on the
712 // heap in question and allowing these variations to be handled
713 // at compile-time.
714 //
715 // tag_field determines which tag is being used for comparison
716 //
717 // ready_opt determines how the ready flag influences the sort
718 //
719 // use_prop_delta determines whether the proportional delta is
720 // added in for comparison
721 template<double RequestTag::*tag_field,
722 ReadyOption ready_opt,
723 bool use_prop_delta>
724 struct ClientCompare {
725 bool operator()(const ClientRec& n1, const ClientRec& n2) const {
726 if (n1.has_request()) {
727 if (n2.has_request()) {
728 const auto& t1 = n1.next_request().tag;
729 const auto& t2 = n2.next_request().tag;
730 if (ReadyOption::ignore == ready_opt || t1.ready == t2.ready) {
731 // if we don't care about ready or the ready values are the same
732 if (use_prop_delta) {
733 return (t1.*tag_field + n1.prop_delta) <
734 (t2.*tag_field + n2.prop_delta);
735 } else {
736 return t1.*tag_field < t2.*tag_field;
737 }
738 } else if (ReadyOption::raises == ready_opt) {
739 // use_ready == true && the ready fields are different
740 return t1.ready;
741 } else {
742 return t2.ready;
743 }
744 } else {
745 // n1 has request but n2 does not
746 return true;
747 }
748 } else if (n2.has_request()) {
749 // n2 has request but n1 does not
750 return false;
751 } else {
752 // both have none; keep stable w false
753 return false;
754 }
755 }
756 };
757
11fdf7f2
TL
758 ClientInfoFunc client_info_f;
759 static constexpr bool is_dynamic_cli_info_f = U1;
7c673cae
FG
760
761 mutable std::mutex data_mtx;
762 using DataGuard = std::lock_guard<decltype(data_mtx)>;
763
764 // stable mapping between client ids and client queues
765 std::map<C,ClientRecRef> client_map;
766
767 c::IndIntruHeap<ClientRecRef,
768 ClientRec,
769 &ClientRec::reserv_heap_data,
770 ClientCompare<&RequestTag::reservation,
771 ReadyOption::ignore,
772 false>,
773 B> resv_heap;
774#if USE_PROP_HEAP
775 c::IndIntruHeap<ClientRecRef,
776 ClientRec,
777 &ClientRec::prop_heap_data,
778 ClientCompare<&RequestTag::proportion,
779 ReadyOption::ignore,
780 true>,
781 B> prop_heap;
782#endif
783 c::IndIntruHeap<ClientRecRef,
784 ClientRec,
785 &ClientRec::lim_heap_data,
786 ClientCompare<&RequestTag::limit,
787 ReadyOption::lowers,
788 false>,
789 B> limit_heap;
790 c::IndIntruHeap<ClientRecRef,
791 ClientRec,
792 &ClientRec::ready_heap_data,
793 ClientCompare<&RequestTag::proportion,
794 ReadyOption::raises,
795 true>,
796 B> ready_heap;
797
11fdf7f2
TL
798 AtLimit at_limit;
799 RejectThreshold reject_threshold = 0;
800
801 double anticipation_timeout;
7c673cae
FG
802
803 std::atomic_bool finishing;
804
805 // every request creates a tick
806 Counter tick = 0;
807
808 // performance data collection
809 size_t reserv_sched_count = 0;
810 size_t prop_sched_count = 0;
811 size_t limit_break_sched_count = 0;
812
813 Duration idle_age;
814 Duration erase_age;
815 Duration check_time;
816 std::deque<MarkPoint> clean_mark_points;
11fdf7f2
TL
817 // max number of clients to erase at a time
818 Counter erase_max;
819 // unfinished last erase point
820 Counter last_erase_point = 0;
7c673cae
FG
821
822 // NB: All threads declared at end, so they're destructed first!
823
824 std::unique_ptr<RunEvery> cleaning_job;
825
11fdf7f2
TL
826 // helper function to return the value of a variant if it matches the
827 // given type T, or a default value of T otherwise
828 template <typename T, typename Variant>
829 static T get_or_default(const Variant& param, T default_value) {
830 const T *p = boost::get<T>(&param);
831 return p ? *p : default_value;
832 }
7c673cae
FG
833
834 // COMMON constructor that others feed into; we can accept three
835 // different variations of durations
836 template<typename Rep, typename Per>
837 PriorityQueueBase(ClientInfoFunc _client_info_f,
838 std::chrono::duration<Rep,Per> _idle_age,
839 std::chrono::duration<Rep,Per> _erase_age,
840 std::chrono::duration<Rep,Per> _check_time,
11fdf7f2
TL
841 AtLimitParam at_limit_param,
842 double _anticipation_timeout) :
7c673cae 843 client_info_f(_client_info_f),
11fdf7f2
TL
844 at_limit(get_or_default(at_limit_param, AtLimit::Reject)),
845 reject_threshold(get_or_default(at_limit_param, RejectThreshold{0})),
846 anticipation_timeout(_anticipation_timeout),
7c673cae
FG
847 finishing(false),
848 idle_age(std::chrono::duration_cast<Duration>(_idle_age)),
849 erase_age(std::chrono::duration_cast<Duration>(_erase_age)),
11fdf7f2
TL
850 check_time(std::chrono::duration_cast<Duration>(_check_time)),
851 erase_max(standard_erase_max)
7c673cae
FG
852 {
853 assert(_erase_age >= _idle_age);
854 assert(_check_time < _idle_age);
11fdf7f2
TL
855 // AtLimit::Reject depends on ImmediateTagCalc
856 assert(at_limit != AtLimit::Reject || !IsDelayed);
7c673cae
FG
857 cleaning_job =
858 std::unique_ptr<RunEvery>(
859 new RunEvery(check_time,
860 std::bind(&PriorityQueueBase::do_clean, this)));
861 }
862
863
864 ~PriorityQueueBase() {
865 finishing = true;
866 }
867
868
11fdf7f2
TL
869 inline const ClientInfo* get_cli_info(ClientRec& client) const {
870 if (is_dynamic_cli_info_f) {
871 client.info = client_info_f(client.client);
872 }
873 return client.info;
874 }
875
7c673cae 876 // data_mtx must be held by caller
11fdf7f2
TL
877 RequestTag initial_tag(DelayedTagCalc delayed, ClientRec& client,
878 const ReqParams& params, Time time, Cost cost) {
879 RequestTag tag(0, 0, 0, time, 0, 0, cost);
7c673cae 880
11fdf7f2
TL
881 // only calculate a tag if the request is going straight to the front
882 if (!client.has_request()) {
883 const ClientInfo* client_info = get_cli_info(client);
884 assert(client_info);
885 tag = RequestTag(client.get_req_tag(), *client_info,
886 params, time, cost, anticipation_timeout);
7c673cae 887
11fdf7f2
TL
888 // copy tag to previous tag for client
889 client.update_req_tag(tag, tick);
890 }
891 return tag;
892 }
893
894 // data_mtx must be held by caller
895 RequestTag initial_tag(ImmediateTagCalc imm, ClientRec& client,
896 const ReqParams& params, Time time, Cost cost) {
897 // calculate the tag unconditionally
898 const ClientInfo* client_info = get_cli_info(client);
899 assert(client_info);
900 RequestTag tag(client.get_req_tag(), *client_info,
901 params, time, cost, anticipation_timeout);
902
903 // copy tag to previous tag for client
904 client.update_req_tag(tag, tick);
905 return tag;
906 }
907
908 // data_mtx must be held by caller. returns 0 on success. when using
909 // AtLimit::Reject, requests that would exceed their limit are rejected
910 // with EAGAIN, and the queue will not take ownership of the given
911 // 'request' argument
912 int do_add_request(RequestRef&& request,
913 const C& client_id,
914 const ReqParams& req_params,
915 const Time time,
916 const Cost cost = 1u) {
917 ++tick;
918
919 auto insert = client_map.emplace(client_id, ClientRecRef{});
920 if (insert.second) {
921 // new client entry
922 const ClientInfo* info = client_info_f(client_id);
923 auto client_rec = std::make_shared<ClientRec>(client_id, info, tick);
7c673cae
FG
924 resv_heap.push(client_rec);
925#if USE_PROP_HEAP
926 prop_heap.push(client_rec);
927#endif
928 limit_heap.push(client_rec);
929 ready_heap.push(client_rec);
11fdf7f2 930 insert.first->second = std::move(client_rec);
7c673cae
FG
931 }
932
933 // for convenience, we'll create a reference to the shared pointer
11fdf7f2 934 ClientRec& client = *insert.first->second;
7c673cae
FG
935
936 if (client.idle) {
937 // We need to do an adjustment so that idle clients compete
938 // fairly on proportional tags since those tags may have
939 // drifted from real-time. Either use the lowest existing
940 // proportion tag -- O(1) -- or the client with the lowest
941 // previous proportion tag -- O(n) where n = # clients.
942 //
f67539c2 943 // So we don't have to maintain a proportional queue that
7c673cae
FG
944 // keeps the minimum on proportional tag alone (we're
945 // instead using a ready queue), we'll have to check each
946 // client.
947 //
948 // The alternative would be to maintain a proportional queue
949 // (define USE_PROP_TAG) and do an O(1) operation here.
950
951 // Was unable to confirm whether equality testing on
952 // std::numeric_limits<double>::max() is guaranteed, so
953 // we'll use a compile-time calculated trigger that is one
954 // third the max, which should be much larger than any
955 // expected organic value.
956 constexpr double lowest_prop_tag_trigger =
957 std::numeric_limits<double>::max() / 3.0;
958
959 double lowest_prop_tag = std::numeric_limits<double>::max();
960 for (auto const &c : client_map) {
961 // don't use ourselves (or anything else that might be
962 // listed as idle) since we're now in the map
963 if (!c.second->idle) {
964 double p;
965 // use either lowest proportion tag or previous proportion tag
966 if (c.second->has_request()) {
967 p = c.second->next_request().tag.proportion +
968 c.second->prop_delta;
969 } else {
970 p = c.second->get_req_tag().proportion + c.second->prop_delta;
971 }
972
973 if (p < lowest_prop_tag) {
974 lowest_prop_tag = p;
975 }
976 }
977 }
978
979 // if this conditional does not fire, it
980 if (lowest_prop_tag < lowest_prop_tag_trigger) {
981 client.prop_delta = lowest_prop_tag - time;
982 }
983 client.idle = false;
984 } // if this client was idle
985
11fdf7f2 986 RequestTag tag = initial_tag(TagCalc{}, client, req_params, time, cost);
7c673cae 987
11fdf7f2
TL
988 if (at_limit == AtLimit::Reject &&
989 tag.limit > time + reject_threshold) {
990 // if the client is over its limit, reject it here
991 return EAGAIN;
7c673cae 992 }
7c673cae 993
11fdf7f2 994 client.add_request(tag, std::move(request));
7c673cae
FG
995 if (1 == client.requests.size()) {
996 // NB: can the following 4 calls to adjust be changed
997 // promote? Can adding a request ever demote a client in the
998 // heaps?
999 resv_heap.adjust(client);
1000 limit_heap.adjust(client);
1001 ready_heap.adjust(client);
1002#if USE_PROP_HEAP
1003 prop_heap.adjust(client);
1004#endif
1005 }
1006
1007 client.cur_rho = req_params.rho;
1008 client.cur_delta = req_params.delta;
1009
1010 resv_heap.adjust(client);
1011 limit_heap.adjust(client);
1012 ready_heap.adjust(client);
1013#if USE_PROP_HEAP
1014 prop_heap.adjust(client);
1015#endif
11fdf7f2
TL
1016 return 0;
1017 } // do_add_request
1018
1019 // data_mtx must be held by caller
1020 void update_next_tag(DelayedTagCalc delayed, ClientRec& top,
1021 const RequestTag& tag) {
1022 if (top.has_request()) {
1023 // perform delayed tag calculation on the next request
1024 ClientReq& next_first = top.next_request();
1025 const ClientInfo* client_info = get_cli_info(top);
1026 assert(client_info);
1027 next_first.tag = RequestTag(tag, *client_info,
1028 top.cur_delta, top.cur_rho,
1029 next_first.tag.arrival,
1030 next_first.tag.cost,
1031 anticipation_timeout);
1032 // copy tag to previous tag for client
1033 top.update_req_tag(next_first.tag, tick);
1034 }
1035 }
7c673cae 1036
11fdf7f2
TL
1037 void update_next_tag(ImmediateTagCalc imm, ClientRec& top,
1038 const RequestTag& tag) {
1039 // the next tag was already calculated on insertion
1040 }
7c673cae
FG
1041
1042 // data_mtx should be held when called; top of heap should have
1043 // a ready request
1044 template<typename C1, IndIntruHeapData ClientRec::*C2, typename C3>
11fdf7f2 1045 RequestTag pop_process_request(IndIntruHeap<C1, ClientRec, C2, C3, B>& heap,
7c673cae 1046 std::function<void(const C& client,
11fdf7f2 1047 const Cost cost,
7c673cae
FG
1048 RequestRef& request)> process) {
1049 // gain access to data
1050 ClientRec& top = heap.top();
31f18b77 1051
11fdf7f2 1052 Cost request_cost = top.next_request().tag.cost;
31f18b77
FG
1053 RequestRef request = std::move(top.next_request().request);
1054 RequestTag tag = top.next_request().tag;
7c673cae
FG
1055
1056 // pop request and adjust heaps
1057 top.pop_request();
1058
11fdf7f2 1059 update_next_tag(TagCalc{}, top, tag);
7c673cae
FG
1060
1061 resv_heap.demote(top);
1062 limit_heap.adjust(top);
1063#if USE_PROP_HEAP
1064 prop_heap.demote(top);
1065#endif
1066 ready_heap.demote(top);
1067
1068 // process
11fdf7f2
TL
1069 process(top.client, request_cost, request);
1070
1071 return tag;
7c673cae
FG
1072 } // pop_process_request
1073
1074
11fdf7f2
TL
1075 // data_mtx must be held by caller
1076 void reduce_reservation_tags(DelayedTagCalc delayed, ClientRec& client,
1077 const RequestTag& tag) {
1078 if (!client.requests.empty()) {
1079 // only maintain a tag for the first request
1080 auto& r = client.requests.front();
1081 r.tag.reservation -=
1082 client.info->reservation_inv * std::max(uint32_t(1), tag.rho);
1083 }
1084 }
1085
7c673cae 1086 // data_mtx should be held when called
11fdf7f2
TL
1087 void reduce_reservation_tags(ImmediateTagCalc imm, ClientRec& client,
1088 const RequestTag& tag) {
1089 double res_offset =
1090 client.info->reservation_inv * std::max(uint32_t(1), tag.rho);
7c673cae 1091 for (auto& r : client.requests) {
11fdf7f2 1092 r.tag.reservation -= res_offset;
7c673cae 1093 }
7c673cae
FG
1094 }
1095
7c673cae 1096 // data_mtx should be held when called
11fdf7f2 1097 void reduce_reservation_tags(const C& client_id, const RequestTag& tag) {
7c673cae
FG
1098 auto client_it = client_map.find(client_id);
1099
1100 // means the client was cleaned from map; should never happen
1101 // as long as cleaning times are long enough
1102 assert(client_map.end() != client_it);
11fdf7f2
TL
1103 ClientRec& client = *client_it->second;
1104 reduce_reservation_tags(TagCalc{}, client, tag);
1105
1106 // don't forget to update previous tag
1107 client.prev_tag.reservation -=
1108 client.info->reservation_inv * std::max(uint32_t(1), tag.rho);
1109 resv_heap.promote(client);
7c673cae
FG
1110 }
1111
1112
1113 // data_mtx should be held when called
1114 NextReq do_next_request(Time now) {
11fdf7f2
TL
1115 // if reservation queue is empty, all are empty (i.e., no
1116 // active clients)
7c673cae 1117 if(resv_heap.empty()) {
11fdf7f2 1118 return NextReq::none();
7c673cae
FG
1119 }
1120
1121 // try constraint (reservation) based scheduling
1122
1123 auto& reserv = resv_heap.top();
1124 if (reserv.has_request() &&
1125 reserv.next_request().tag.reservation <= now) {
11fdf7f2 1126 return NextReq(HeapId::reservation);
7c673cae
FG
1127 }
1128
1129 // no existing reservations before now, so try weight-based
1130 // scheduling
1131
1132 // all items that are within limit are eligible based on
1133 // priority
1134 auto limits = &limit_heap.top();
1135 while (limits->has_request() &&
1136 !limits->next_request().tag.ready &&
1137 limits->next_request().tag.limit <= now) {
1138 limits->next_request().tag.ready = true;
1139 ready_heap.promote(*limits);
1140 limit_heap.demote(*limits);
1141
1142 limits = &limit_heap.top();
1143 }
1144
1145 auto& readys = ready_heap.top();
1146 if (readys.has_request() &&
1147 readys.next_request().tag.ready &&
1148 readys.next_request().tag.proportion < max_tag) {
11fdf7f2 1149 return NextReq(HeapId::ready);
7c673cae
FG
1150 }
1151
1152 // if nothing is schedulable by reservation or
1153 // proportion/weight, and if we allow limit break, try to
1154 // schedule something with the lowest proportion tag or
1155 // alternatively lowest reservation tag.
11fdf7f2 1156 if (at_limit == AtLimit::Allow) {
7c673cae
FG
1157 if (readys.has_request() &&
1158 readys.next_request().tag.proportion < max_tag) {
11fdf7f2 1159 return NextReq(HeapId::ready);
7c673cae
FG
1160 } else if (reserv.has_request() &&
1161 reserv.next_request().tag.reservation < max_tag) {
11fdf7f2 1162 return NextReq(HeapId::reservation);
7c673cae
FG
1163 }
1164 }
1165
1166 // nothing scheduled; make sure we re-run when next
1167 // reservation item or next limited item comes up
1168
1169 Time next_call = TimeMax;
1170 if (resv_heap.top().has_request()) {
1171 next_call =
1172 min_not_0_time(next_call,
1173 resv_heap.top().next_request().tag.reservation);
1174 }
1175 if (limit_heap.top().has_request()) {
1176 const auto& next = limit_heap.top().next_request();
1177 assert(!next.tag.ready || max_tag == next.tag.proportion);
1178 next_call = min_not_0_time(next_call, next.tag.limit);
1179 }
1180 if (next_call < TimeMax) {
11fdf7f2 1181 return NextReq(next_call);
7c673cae 1182 } else {
11fdf7f2 1183 return NextReq::none();
7c673cae
FG
1184 }
1185 } // do_next_request
1186
1187
1188 // if possible is not zero and less than current then return it;
1189 // otherwise return current; the idea is we're trying to find
1190 // the minimal time but ignoring zero
1191 static inline const Time& min_not_0_time(const Time& current,
1192 const Time& possible) {
1193 return TimeZero == possible ? current : std::min(current, possible);
1194 }
1195
1196
1197 /*
1198 * This is being called regularly by RunEvery. Every time it's
1199 * called it notes the time and delta counter (mark point) in a
1200 * deque. It also looks at the deque to find the most recent
1201 * mark point that is older than clean_age. It then walks the
1202 * map and delete all server entries that were last used before
1203 * that mark point.
1204 */
1205 void do_clean() {
1206 TimePoint now = std::chrono::steady_clock::now();
1207 DataGuard g(data_mtx);
1208 clean_mark_points.emplace_back(MarkPoint(now, tick));
1209
1210 // first erase the super-old client records
1211
11fdf7f2 1212 Counter erase_point = last_erase_point;
7c673cae
FG
1213 auto point = clean_mark_points.front();
1214 while (point.first <= now - erase_age) {
11fdf7f2
TL
1215 last_erase_point = point.second;
1216 erase_point = last_erase_point;
7c673cae
FG
1217 clean_mark_points.pop_front();
1218 point = clean_mark_points.front();
1219 }
1220
1221 Counter idle_point = 0;
1222 for (auto i : clean_mark_points) {
1223 if (i.first <= now - idle_age) {
1224 idle_point = i.second;
1225 } else {
1226 break;
1227 }
1228 }
1229
11fdf7f2 1230 Counter erased_num = 0;
7c673cae
FG
1231 if (erase_point > 0 || idle_point > 0) {
1232 for (auto i = client_map.begin(); i != client_map.end(); /* empty */) {
1233 auto i2 = i++;
11fdf7f2
TL
1234 if (erase_point &&
1235 erased_num < erase_max &&
1236 i2->second->last_tick <= erase_point) {
7c673cae
FG
1237 delete_from_heaps(i2->second);
1238 client_map.erase(i2);
11fdf7f2 1239 erased_num++;
7c673cae
FG
1240 } else if (idle_point && i2->second->last_tick <= idle_point) {
1241 i2->second->idle = true;
1242 }
1243 } // for
11fdf7f2
TL
1244
1245 auto wperiod = check_time;
1246 if (erased_num >= erase_max) {
1247 wperiod = duration_cast<milliseconds>(aggressive_check_time);
1248 } else {
1249 // clean finished, refresh
1250 last_erase_point = 0;
1251 }
1252 cleaning_job->try_update(wperiod);
7c673cae
FG
1253 } // if
1254 } // do_clean
1255
1256
1257 // data_mtx must be held by caller
1258 template<IndIntruHeapData ClientRec::*C1,typename C2>
1259 void delete_from_heap(ClientRecRef& client,
1260 c::IndIntruHeap<ClientRecRef,ClientRec,C1,C2,B>& heap) {
11fdf7f2 1261 auto i = heap.at(client);
7c673cae
FG
1262 heap.remove(i);
1263 }
1264
1265
1266 // data_mtx must be held by caller
1267 void delete_from_heaps(ClientRecRef& client) {
1268 delete_from_heap(client, resv_heap);
1269#if USE_PROP_HEAP
1270 delete_from_heap(client, prop_heap);
1271#endif
1272 delete_from_heap(client, limit_heap);
1273 delete_from_heap(client, ready_heap);
1274 }
1275 }; // class PriorityQueueBase
1276
1277
9f95a23c 1278 template<typename C, typename R, bool IsDelayed=false, bool U1=false, unsigned B=2>
11fdf7f2
TL
1279 class PullPriorityQueue : public PriorityQueueBase<C,R,IsDelayed,U1,B> {
1280 using super = PriorityQueueBase<C,R,IsDelayed,U1,B>;
7c673cae
FG
1281
1282 public:
1283
1284 // When a request is pulled, this is the return type.
1285 struct PullReq {
1286 struct Retn {
11fdf7f2
TL
1287 C client;
1288 typename super::RequestRef request;
1289 PhaseType phase;
1290 Cost cost;
7c673cae
FG
1291 };
1292
1293 typename super::NextReqType type;
1294 boost::variant<Retn,Time> data;
1295
1296 bool is_none() const { return type == super::NextReqType::none; }
1297
1298 bool is_retn() const { return type == super::NextReqType::returning; }
1299 Retn& get_retn() {
1300 return boost::get<Retn>(data);
1301 }
1302
1303 bool is_future() const { return type == super::NextReqType::future; }
1304 Time getTime() const { return boost::get<Time>(data); }
1305 };
1306
1307
1308#ifdef PROFILE
1309 ProfileTimer<std::chrono::nanoseconds> pull_request_timer;
1310 ProfileTimer<std::chrono::nanoseconds> add_request_timer;
1311#endif
1312
1313 template<typename Rep, typename Per>
1314 PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1315 std::chrono::duration<Rep,Per> _idle_age,
1316 std::chrono::duration<Rep,Per> _erase_age,
1317 std::chrono::duration<Rep,Per> _check_time,
11fdf7f2
TL
1318 AtLimitParam at_limit_param = AtLimit::Wait,
1319 double _anticipation_timeout = 0.0) :
7c673cae
FG
1320 super(_client_info_f,
1321 _idle_age, _erase_age, _check_time,
11fdf7f2 1322 at_limit_param, _anticipation_timeout)
7c673cae
FG
1323 {
1324 // empty
1325 }
1326
1327
1328 // pull convenience constructor
1329 PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
11fdf7f2
TL
1330 AtLimitParam at_limit_param = AtLimit::Wait,
1331 double _anticipation_timeout = 0.0) :
7c673cae 1332 PullPriorityQueue(_client_info_f,
11fdf7f2
TL
1333 standard_idle_age,
1334 standard_erase_age,
1335 standard_check_time,
1336 at_limit_param,
1337 _anticipation_timeout)
7c673cae
FG
1338 {
1339 // empty
1340 }
1341
1342
11fdf7f2
TL
1343 int add_request(R&& request,
1344 const C& client_id,
1345 const ReqParams& req_params,
1346 const Cost cost = 1u) {
1347 return add_request(typename super::RequestRef(new R(std::move(request))),
1348 client_id,
1349 req_params,
1350 get_time(),
1351 cost);
7c673cae
FG
1352 }
1353
1354
11fdf7f2
TL
1355 int add_request(R&& request,
1356 const C& client_id,
1357 const Cost cost = 1u) {
7c673cae 1358 static const ReqParams null_req_params;
11fdf7f2
TL
1359 return add_request(typename super::RequestRef(new R(std::move(request))),
1360 client_id,
1361 null_req_params,
1362 get_time(),
1363 cost);
7c673cae
FG
1364 }
1365
1366
11fdf7f2
TL
1367 int add_request_time(R&& request,
1368 const C& client_id,
1369 const ReqParams& req_params,
1370 const Time time,
1371 const Cost cost = 1u) {
1372 return add_request(typename super::RequestRef(new R(std::move(request))),
1373 client_id,
1374 req_params,
1375 time,
1376 cost);
7c673cae
FG
1377 }
1378
1379
11fdf7f2
TL
1380 int add_request(typename super::RequestRef&& request,
1381 const C& client_id,
1382 const ReqParams& req_params,
1383 const Cost cost = 1u) {
1384 return add_request(request, req_params, client_id, get_time(), cost);
7c673cae
FG
1385 }
1386
1387
11fdf7f2
TL
1388 int add_request(typename super::RequestRef&& request,
1389 const C& client_id,
1390 const Cost cost = 1u) {
7c673cae 1391 static const ReqParams null_req_params;
11fdf7f2 1392 return add_request(request, null_req_params, client_id, get_time(), cost);
7c673cae
FG
1393 }
1394
1395
1396 // this does the work; the versions above provide alternate interfaces
11fdf7f2
TL
1397 int add_request(typename super::RequestRef&& request,
1398 const C& client_id,
1399 const ReqParams& req_params,
1400 const Time time,
1401 const Cost cost = 1u) {
7c673cae
FG
1402 typename super::DataGuard g(this->data_mtx);
1403#ifdef PROFILE
1404 add_request_timer.start();
1405#endif
11fdf7f2
TL
1406 int r = super::do_add_request(std::move(request),
1407 client_id,
1408 req_params,
1409 time,
1410 cost);
7c673cae
FG
1411 // no call to schedule_request for pull version
1412#ifdef PROFILE
1413 add_request_timer.stop();
1414#endif
11fdf7f2 1415 return r;
7c673cae
FG
1416 }
1417
1418
1419 inline PullReq pull_request() {
1420 return pull_request(get_time());
1421 }
1422
1423
11fdf7f2 1424 PullReq pull_request(const Time now) {
7c673cae
FG
1425 PullReq result;
1426 typename super::DataGuard g(this->data_mtx);
1427#ifdef PROFILE
1428 pull_request_timer.start();
1429#endif
1430
1431 typename super::NextReq next = super::do_next_request(now);
1432 result.type = next.type;
1433 switch(next.type) {
1434 case super::NextReqType::none:
1435 return result;
7c673cae
FG
1436 case super::NextReqType::future:
1437 result.data = next.when_ready;
1438 return result;
7c673cae
FG
1439 case super::NextReqType::returning:
1440 // to avoid nesting, break out and let code below handle this case
1441 break;
1442 default:
1443 assert(false);
1444 }
1445
1446 // we'll only get here if we're returning an entry
1447
1448 auto process_f =
1449 [&] (PullReq& pull_result, PhaseType phase) ->
1450 std::function<void(const C&,
11fdf7f2 1451 uint64_t,
7c673cae
FG
1452 typename super::RequestRef&)> {
1453 return [&pull_result, phase](const C& client,
11fdf7f2 1454 const Cost request_cost,
7c673cae 1455 typename super::RequestRef& request) {
11fdf7f2
TL
1456 pull_result.data = typename PullReq::Retn{ client,
1457 std::move(request),
1458 phase,
1459 request_cost };
7c673cae
FG
1460 };
1461 };
1462
1463 switch(next.heap_id) {
1464 case super::HeapId::reservation:
11fdf7f2
TL
1465 (void) super::pop_process_request(this->resv_heap,
1466 process_f(result,
1467 PhaseType::reservation));
7c673cae
FG
1468 ++this->reserv_sched_count;
1469 break;
1470 case super::HeapId::ready:
11fdf7f2
TL
1471 {
1472 auto tag = super::pop_process_request(this->ready_heap,
7c673cae 1473 process_f(result, PhaseType::priority));
11fdf7f2 1474 // need to use retn temporarily
7c673cae 1475 auto& retn = boost::get<typename PullReq::Retn>(result.data);
11fdf7f2 1476 super::reduce_reservation_tags(retn.client, tag);
7c673cae
FG
1477 }
1478 ++this->prop_sched_count;
1479 break;
1480 default:
1481 assert(false);
1482 }
1483
1484#ifdef PROFILE
1485 pull_request_timer.stop();
1486#endif
1487 return result;
1488 } // pull_request
1489
1490
1491 protected:
1492
1493
1494 // data_mtx should be held when called; unfortunately this
1495 // function has to be repeated in both push & pull
1496 // specializations
1497 typename super::NextReq next_request() {
1498 return next_request(get_time());
1499 }
1500 }; // class PullPriorityQueue
1501
1502
1503 // PUSH version
9f95a23c 1504 template<typename C, typename R, bool IsDelayed=false, bool U1=false, unsigned B=2>
11fdf7f2 1505 class PushPriorityQueue : public PriorityQueueBase<C,R,IsDelayed,U1,B> {
7c673cae
FG
1506
1507 protected:
1508
11fdf7f2 1509 using super = PriorityQueueBase<C,R,IsDelayed,U1,B>;
7c673cae
FG
1510
1511 public:
1512
1513 // a function to see whether the server can handle another request
1514 using CanHandleRequestFunc = std::function<bool(void)>;
1515
1516 // a function to submit a request to the server; the second
1517 // parameter is a callback when it's completed
1518 using HandleRequestFunc =
11fdf7f2 1519 std::function<void(const C&,typename super::RequestRef,PhaseType,uint64_t)>;
7c673cae
FG
1520
1521 protected:
1522
1523 CanHandleRequestFunc can_handle_f;
1524 HandleRequestFunc handle_f;
1525 // for handling timed scheduling
1526 std::mutex sched_ahead_mtx;
1527 std::condition_variable sched_ahead_cv;
1528 Time sched_ahead_when = TimeZero;
1529
1530#ifdef PROFILE
1531 public:
1532 ProfileTimer<std::chrono::nanoseconds> add_request_timer;
1533 ProfileTimer<std::chrono::nanoseconds> request_complete_timer;
1534 protected:
1535#endif
1536
1537 // NB: threads declared last, so constructed last and destructed first
1538
1539 std::thread sched_ahead_thd;
1540
1541 public:
1542
1543 // push full constructor
1544 template<typename Rep, typename Per>
1545 PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1546 CanHandleRequestFunc _can_handle_f,
1547 HandleRequestFunc _handle_f,
1548 std::chrono::duration<Rep,Per> _idle_age,
1549 std::chrono::duration<Rep,Per> _erase_age,
1550 std::chrono::duration<Rep,Per> _check_time,
11fdf7f2
TL
1551 AtLimitParam at_limit_param = AtLimit::Wait,
1552 double anticipation_timeout = 0.0) :
7c673cae
FG
1553 super(_client_info_f,
1554 _idle_age, _erase_age, _check_time,
11fdf7f2 1555 at_limit_param, anticipation_timeout)
7c673cae
FG
1556 {
1557 can_handle_f = _can_handle_f;
1558 handle_f = _handle_f;
1559 sched_ahead_thd = std::thread(&PushPriorityQueue::run_sched_ahead, this);
1560 }
1561
1562
1563 // push convenience constructor
1564 PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1565 CanHandleRequestFunc _can_handle_f,
1566 HandleRequestFunc _handle_f,
11fdf7f2
TL
1567 AtLimitParam at_limit_param = AtLimit::Wait,
1568 double _anticipation_timeout = 0.0) :
7c673cae
FG
1569 PushPriorityQueue(_client_info_f,
1570 _can_handle_f,
1571 _handle_f,
11fdf7f2
TL
1572 standard_idle_age,
1573 standard_erase_age,
1574 standard_check_time,
1575 at_limit_param,
1576 _anticipation_timeout)
7c673cae
FG
1577 {
1578 // empty
1579 }
1580
1581
1582 ~PushPriorityQueue() {
1583 this->finishing = true;
f67539c2
TL
1584 {
1585 std::lock_guard<std::mutex> l(sched_ahead_mtx);
1586 sched_ahead_cv.notify_one();
1587 }
7c673cae
FG
1588 sched_ahead_thd.join();
1589 }
1590
1591 public:
1592
11fdf7f2
TL
1593 int add_request(R&& request,
1594 const C& client_id,
1595 const ReqParams& req_params,
1596 const Cost cost = 1u) {
1597 return add_request(typename super::RequestRef(new R(std::move(request))),
1598 client_id,
1599 req_params,
1600 get_time(),
1601 cost);
7c673cae
FG
1602 }
1603
1604
11fdf7f2
TL
1605 int add_request(typename super::RequestRef&& request,
1606 const C& client_id,
1607 const ReqParams& req_params,
1608 const Cost cost = 1u) {
1609 return add_request(request, req_params, client_id, get_time(), cost);
7c673cae
FG
1610 }
1611
1612
11fdf7f2
TL
1613 int add_request_time(const R& request,
1614 const C& client_id,
1615 const ReqParams& req_params,
1616 const Time time,
1617 const Cost cost = 1u) {
1618 return add_request(typename super::RequestRef(new R(request)),
1619 client_id,
1620 req_params,
1621 time,
1622 cost);
7c673cae
FG
1623 }
1624
1625
11fdf7f2
TL
1626 int add_request(typename super::RequestRef&& request,
1627 const C& client_id,
1628 const ReqParams& req_params,
1629 const Time time,
1630 const Cost cost = 1u) {
7c673cae
FG
1631 typename super::DataGuard g(this->data_mtx);
1632#ifdef PROFILE
1633 add_request_timer.start();
1634#endif
11fdf7f2
TL
1635 int r = super::do_add_request(std::move(request),
1636 client_id,
1637 req_params,
1638 time,
1639 cost);
1640 if (r == 0) {
1641 schedule_request();
1642 }
7c673cae
FG
1643#ifdef PROFILE
1644 add_request_timer.stop();
1645#endif
11fdf7f2 1646 return r;
7c673cae
FG
1647 }
1648
1649
1650 void request_completed() {
1651 typename super::DataGuard g(this->data_mtx);
1652#ifdef PROFILE
1653 request_complete_timer.start();
1654#endif
1655 schedule_request();
1656#ifdef PROFILE
1657 request_complete_timer.stop();
1658#endif
1659 }
1660
1661 protected:
1662
1663 // data_mtx should be held when called; furthermore, the heap
1664 // should not be empty and the top element of the heap should
1665 // not be already handled
1666 //
1667 // NOTE: the use of "super::ClientRec" in either the template
1668 // construct or as a parameter to submit_top_request generated
1669 // a compiler error in g++ 4.8.4, when ClientRec was
1670 // "protected" rather than "public". By g++ 6.3.1 this was not
1671 // an issue. But for backwards compatibility
1672 // PriorityQueueBase::ClientRec is public.
1673 template<typename C1,
1674 IndIntruHeapData super::ClientRec::*C2,
1675 typename C3,
9f95a23c 1676 unsigned B4>
11fdf7f2
TL
1677 typename super::RequestMeta
1678 submit_top_request(IndIntruHeap<C1,typename super::ClientRec,C2,C3,B4>& heap,
1679 PhaseType phase) {
7c673cae 1680 C client_result;
11fdf7f2 1681 RequestTag tag = super::pop_process_request(heap,
7c673cae
FG
1682 [this, phase, &client_result]
1683 (const C& client,
11fdf7f2 1684 const Cost request_cost,
7c673cae
FG
1685 typename super::RequestRef& request) {
1686 client_result = client;
11fdf7f2 1687 handle_f(client, std::move(request), phase, request_cost);
7c673cae 1688 });
11fdf7f2
TL
1689 typename super::RequestMeta req(client_result, tag);
1690 return req;
7c673cae
FG
1691 }
1692
1693
1694 // data_mtx should be held when called
1695 void submit_request(typename super::HeapId heap_id) {
7c673cae
FG
1696 switch(heap_id) {
1697 case super::HeapId::reservation:
1698 // don't need to note client
1699 (void) submit_top_request(this->resv_heap, PhaseType::reservation);
1700 // unlike the other two cases, we do not reduce reservation
1701 // tags here
1702 ++this->reserv_sched_count;
1703 break;
1704 case super::HeapId::ready:
11fdf7f2
TL
1705 {
1706 auto req = submit_top_request(this->ready_heap, PhaseType::priority);
1707 super::reduce_reservation_tags(req.client_id, req.tag);
1708 }
7c673cae
FG
1709 ++this->prop_sched_count;
1710 break;
1711 default:
1712 assert(false);
1713 }
1714 } // submit_request
1715
1716
1717 // data_mtx should be held when called; unfortunately this
1718 // function has to be repeated in both push & pull
1719 // specializations
1720 typename super::NextReq next_request() {
1721 return next_request(get_time());
1722 }
1723
1724
1725 // data_mtx should be held when called; overrides member
1726 // function in base class to add check for whether a request can
1727 // be pushed to the server
1728 typename super::NextReq next_request(Time now) {
1729 if (!can_handle_f()) {
1730 typename super::NextReq result;
1731 result.type = super::NextReqType::none;
1732 return result;
1733 } else {
1734 return super::do_next_request(now);
1735 }
1736 } // next_request
1737
1738
1739 // data_mtx should be held when called
1740 void schedule_request() {
1741 typename super::NextReq next_req = next_request();
1742 switch (next_req.type) {
1743 case super::NextReqType::none:
1744 return;
1745 case super::NextReqType::future:
1746 sched_at(next_req.when_ready);
1747 break;
1748 case super::NextReqType::returning:
1749 submit_request(next_req.heap_id);
1750 break;
1751 default:
1752 assert(false);
1753 }
1754 }
1755
1756
1757 // this is the thread that handles running schedule_request at
1758 // future times when nothing can be scheduled immediately
1759 void run_sched_ahead() {
1760 std::unique_lock<std::mutex> l(sched_ahead_mtx);
1761
1762 while (!this->finishing) {
f67539c2
TL
1763 // predicate for cond.wait()
1764 const auto pred = [this] () -> bool { return this->finishing; };
1765
7c673cae 1766 if (TimeZero == sched_ahead_when) {
f67539c2 1767 sched_ahead_cv.wait(l, pred);
7c673cae 1768 } else {
f67539c2
TL
1769 // cast from Time -> duration<Time> -> Duration -> TimePoint
1770 const auto until = typename super::TimePoint{
1771 duration_cast<typename super::Duration>(
1772 std::chrono::duration<Time>{sched_ahead_when})};
1773 sched_ahead_cv.wait_until(l, until, pred);
7c673cae
FG
1774 sched_ahead_when = TimeZero;
1775 if (this->finishing) return;
1776
1777 l.unlock();
1778 if (!this->finishing) {
1779 typename super::DataGuard g(this->data_mtx);
1780 schedule_request();
1781 }
1782 l.lock();
1783 }
1784 }
1785 }
1786
1787
1788 void sched_at(Time when) {
1789 std::lock_guard<std::mutex> l(sched_ahead_mtx);
31f18b77 1790 if (this->finishing) return;
7c673cae
FG
1791 if (TimeZero == sched_ahead_when || when < sched_ahead_when) {
1792 sched_ahead_when = when;
1793 sched_ahead_cv.notify_one();
1794 }
1795 }
1796 }; // class PushPriorityQueue
1797
1798 } // namespace dmclock
1799} // namespace crimson