]> git.proxmox.com Git - ceph.git/blame - ceph/src/dmclock/src/dmclock_server.h
update download target update for octopus release
[ceph.git] / ceph / src / dmclock / src / dmclock_server.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4/*
5 * Copyright (C) 2017 Red Hat Inc.
11fdf7f2
TL
6 *
7 * Author: J. Eric Ivancich <ivancich@redhat.com>
8 *
9 * This is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU Lesser General Public License version
11 * 2.1, as published by the Free Software Foundation. See file
12 * COPYING.
7c673cae
FG
13 */
14
15
16#pragma once
17
18/* COMPILATION OPTIONS
7c673cae
FG
19 *
20 * The prop_heap does not seem to be necessary. The only thing it
11fdf7f2 21 * would help with is quickly finding the minimum proportion/prioity
7c673cae
FG
22 * when an idle client became active. To have the code maintain the
23 * proportional heap, define USE_PROP_HEAP (i.e., compiler argument
24 * -DUSE_PROP_HEAP).
25 */
26
27#include <assert.h>
28
29#include <cmath>
30#include <memory>
31#include <map>
32#include <deque>
33#include <queue>
34#include <atomic>
35#include <mutex>
36#include <condition_variable>
37#include <thread>
38#include <iostream>
39#include <sstream>
40#include <limits>
41
42#include <boost/variant.hpp>
43
44#include "indirect_intrusive_heap.h"
45#include "run_every.h"
46#include "dmclock_util.h"
47#include "dmclock_recs.h"
48
49#ifdef PROFILE
50#include "profile.h"
51#endif
52
7c673cae
FG
53
54namespace crimson {
55
56 namespace dmclock {
57
58 namespace c = crimson;
59
60 constexpr double max_tag = std::numeric_limits<double>::is_iec559 ?
61 std::numeric_limits<double>::infinity() :
62 std::numeric_limits<double>::max();
63 constexpr double min_tag = std::numeric_limits<double>::is_iec559 ?
64 -std::numeric_limits<double>::infinity() :
65 std::numeric_limits<double>::lowest();
66 constexpr uint tag_modulo = 1000000;
67
11fdf7f2
TL
68 constexpr auto standard_idle_age = std::chrono::seconds(300);
69 constexpr auto standard_erase_age = std::chrono::seconds(600);
70 constexpr auto standard_check_time = std::chrono::seconds(60);
71 constexpr auto aggressive_check_time = std::chrono::seconds(5);
72 constexpr uint standard_erase_max = 2000;
73
74 enum class AtLimit {
75 // requests are delayed until the limit is restored
76 Wait,
77 // requests are allowed to exceed their limit, if all other reservations
78 // are met and below their limits
79 Allow,
80 // if an incoming request would exceed its limit, add_request() will
81 // reject it with EAGAIN instead of adding it to the queue. cannot be used
82 // with DelayedTagCalc, because add_request() needs an accurate tag
83 Reject,
84 };
85
86 // when AtLimit::Reject is used, only start rejecting requests once their
87 // limit is above this threshold. requests under this threshold are
88 // enqueued and processed like AtLimit::Wait
89 using RejectThreshold = Time;
90
91 // the AtLimit constructor parameter can either accept AtLimit or a value
92 // for RejectThreshold (which implies AtLimit::Reject)
93 using AtLimitParam = boost::variant<AtLimit, RejectThreshold>;
94
7c673cae 95 struct ClientInfo {
11fdf7f2
TL
96 double reservation; // minimum
97 double weight; // proportional
98 double limit; // maximum
7c673cae
FG
99
100 // multiplicative inverses of above, which we use in calculations
101 // and don't want to recalculate repeatedly
11fdf7f2
TL
102 double reservation_inv;
103 double weight_inv;
104 double limit_inv;
7c673cae
FG
105
106 // order parameters -- min, "normal", max
107 ClientInfo(double _reservation, double _weight, double _limit) :
108 reservation(_reservation),
109 weight(_weight),
110 limit(_limit),
111 reservation_inv(0.0 == reservation ? 0.0 : 1.0 / reservation),
112 weight_inv( 0.0 == weight ? 0.0 : 1.0 / weight),
113 limit_inv( 0.0 == limit ? 0.0 : 1.0 / limit)
114 {
115 // empty
116 }
117
118
119 friend std::ostream& operator<<(std::ostream& out,
120 const ClientInfo& client) {
121 out <<
122 "{ ClientInfo:: r:" << client.reservation <<
123 " w:" << std::fixed << client.weight <<
124 " l:" << std::fixed << client.limit <<
125 " 1/r:" << std::fixed << client.reservation_inv <<
126 " 1/w:" << std::fixed << client.weight_inv <<
127 " 1/l:" << std::fixed << client.limit_inv <<
128 " }";
129 return out;
130 }
131 }; // class ClientInfo
132
133
134 struct RequestTag {
11fdf7f2
TL
135 double reservation;
136 double proportion;
137 double limit;
138 uint32_t delta;
139 uint32_t rho;
140 Cost cost;
141 bool ready; // true when within limit
142 Time arrival;
7c673cae
FG
143
144 RequestTag(const RequestTag& prev_tag,
145 const ClientInfo& client,
11fdf7f2
TL
146 const uint32_t _delta,
147 const uint32_t _rho,
31f18b77 148 const Time time,
11fdf7f2
TL
149 const Cost _cost = 1u,
150 const double anticipation_timeout = 0.0) :
151 delta(_delta),
152 rho(_rho),
153 cost(_cost),
154 ready(false),
155 arrival(time)
7c673cae 156 {
11fdf7f2
TL
157 assert(cost > 0);
158 Time max_time = time;
159 if (time - anticipation_timeout < prev_tag.arrival)
160 max_time -= anticipation_timeout;
161
162 reservation = tag_calc(max_time,
163 prev_tag.reservation,
164 client.reservation_inv,
165 rho,
166 true,
167 cost);
168 proportion = tag_calc(max_time,
169 prev_tag.proportion,
170 client.weight_inv,
171 delta,
172 true,
173 cost);
174 limit = tag_calc(max_time,
175 prev_tag.limit,
176 client.limit_inv,
177 delta,
178 false,
179 cost);
180
7c673cae
FG
181 assert(reservation < max_tag || proportion < max_tag);
182 }
183
31f18b77
FG
184 RequestTag(const RequestTag& prev_tag,
185 const ClientInfo& client,
186 const ReqParams req_params,
187 const Time time,
11fdf7f2
TL
188 const Cost cost = 1u,
189 const double anticipation_timeout = 0.0) :
190 RequestTag(prev_tag, client, req_params.delta, req_params.rho, time,
191 cost, anticipation_timeout)
31f18b77
FG
192 { /* empty */ }
193
11fdf7f2
TL
194 RequestTag(const double _res, const double _prop, const double _lim,
195 const Time _arrival,
196 const uint32_t _delta = 0,
197 const uint32_t _rho = 0,
198 const Cost _cost = 1u) :
7c673cae
FG
199 reservation(_res),
200 proportion(_prop),
201 limit(_lim),
11fdf7f2
TL
202 delta(_delta),
203 rho(_rho),
204 cost(_cost),
205 ready(false),
206 arrival(_arrival)
7c673cae 207 {
11fdf7f2 208 assert(cost > 0);
7c673cae
FG
209 assert(reservation < max_tag || proportion < max_tag);
210 }
211
212 RequestTag(const RequestTag& other) :
213 reservation(other.reservation),
214 proportion(other.proportion),
215 limit(other.limit),
11fdf7f2
TL
216 delta(other.delta),
217 rho(other.rho),
218 cost(other.cost),
219 ready(other.ready),
220 arrival(other.arrival)
221 { /* empty */ }
7c673cae
FG
222
223 static std::string format_tag_change(double before, double after) {
224 if (before == after) {
225 return std::string("same");
226 } else {
227 std::stringstream ss;
228 ss << format_tag(before) << "=>" << format_tag(after);
229 return ss.str();
230 }
231 }
232
233 static std::string format_tag(double value) {
234 if (max_tag == value) {
235 return std::string("max");
236 } else if (min_tag == value) {
237 return std::string("min");
238 } else {
239 return format_time(value, tag_modulo);
240 }
241 }
242
243 private:
244
31f18b77 245 static double tag_calc(const Time time,
11fdf7f2
TL
246 const double prev,
247 const double increment,
248 const uint32_t dist_req_val,
249 const bool extreme_is_high,
250 const Cost cost) {
7c673cae
FG
251 if (0.0 == increment) {
252 return extreme_is_high ? max_tag : min_tag;
253 } else {
11fdf7f2
TL
254 // insure 64-bit arithmetic before conversion to double
255 double tag_increment = increment * (uint64_t(dist_req_val) + cost);
256 return std::max(time, prev + tag_increment);
7c673cae
FG
257 }
258 }
259
260 friend std::ostream& operator<<(std::ostream& out,
261 const RequestTag& tag) {
262 out <<
263 "{ RequestTag:: ready:" << (tag.ready ? "true" : "false") <<
264 " r:" << format_tag(tag.reservation) <<
265 " p:" << format_tag(tag.proportion) <<
266 " l:" << format_tag(tag.limit) <<
267#if 0 // try to resolve this to make sure Time is operator<<'able.
7c673cae 268 " arrival:" << tag.arrival <<
7c673cae
FG
269#endif
270 " }";
271 return out;
272 }
273 }; // class RequestTag
274
11fdf7f2
TL
275 // C is client identifier type, R is request type,
276 // IsDelayed controls whether tag calculation is delayed until the request
277 // reaches the front of its queue. This is an optimization over the
278 // originally published dmclock algorithm, allowing it to use the most
279 // recent values of rho and delta.
280 // U1 determines whether to use client information function dynamically,
281 // B is heap branching factor
282 template<typename C, typename R, bool IsDelayed, bool U1, uint B>
7c673cae 283 class PriorityQueueBase {
d2e6a577
FG
284 // we don't want to include gtest.h just for FRIEND_TEST
285 friend class dmclock_server_client_idle_erase_Test;
7c673cae 286
11fdf7f2
TL
287 // types used for tag dispatch to select between implementations
288 using TagCalc = std::integral_constant<bool, IsDelayed>;
289 using DelayedTagCalc = std::true_type;
290 using ImmediateTagCalc = std::false_type;
291
7c673cae
FG
292 public:
293
294 using RequestRef = std::unique_ptr<R>;
295
296 protected:
297
298 using TimePoint = decltype(std::chrono::steady_clock::now());
299 using Duration = std::chrono::milliseconds;
300 using MarkPoint = std::pair<TimePoint,Counter>;
301
302 enum class ReadyOption {ignore, lowers, raises};
303
304 // forward decl for friend decls
305 template<double RequestTag::*, ReadyOption, bool>
306 struct ClientCompare;
307
308 class ClientReq {
309 friend PriorityQueueBase;
310
311 RequestTag tag;
312 C client_id;
313 RequestRef request;
314
315 public:
316
317 ClientReq(const RequestTag& _tag,
318 const C& _client_id,
319 RequestRef&& _request) :
320 tag(_tag),
321 client_id(_client_id),
322 request(std::move(_request))
323 {
324 // empty
325 }
326
327 friend std::ostream& operator<<(std::ostream& out, const ClientReq& c) {
328 out << "{ ClientReq:: tag:" << c.tag << " client:" <<
329 c.client_id << " }";
330 return out;
331 }
332 }; // class ClientReq
333
11fdf7f2
TL
334 struct RequestMeta {
335 C client_id;
336 RequestTag tag;
337
338 RequestMeta(const C& _client_id, const RequestTag& _tag) :
339 client_id(_client_id),
340 tag(_tag)
341 {
342 // empty
343 }
344 };
345
7c673cae
FG
346 public:
347
348 // NOTE: ClientRec is in the "public" section for compatibility
349 // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1
350 // ClientRec could be "protected" with no issue. [See comments
351 // associated with function submit_top_request.]
352 class ClientRec {
11fdf7f2 353 friend PriorityQueueBase<C,R,IsDelayed,U1,B>;
7c673cae
FG
354
355 C client;
356 RequestTag prev_tag;
357 std::deque<ClientReq> requests;
358
359 // amount added from the proportion tag as a result of
360 // an idle client becoming unidle
361 double prop_delta = 0.0;
362
11fdf7f2
TL
363 c::IndIntruHeapData reserv_heap_data {};
364 c::IndIntruHeapData lim_heap_data {};
365 c::IndIntruHeapData ready_heap_data {};
7c673cae 366#if USE_PROP_HEAP
11fdf7f2 367 c::IndIntruHeapData prop_heap_data {};
7c673cae
FG
368#endif
369
370 public:
371
11fdf7f2 372 const ClientInfo* info;
7c673cae
FG
373 bool idle;
374 Counter last_tick;
375 uint32_t cur_rho;
376 uint32_t cur_delta;
377
378 ClientRec(C _client,
11fdf7f2 379 const ClientInfo* _info,
7c673cae
FG
380 Counter current_tick) :
381 client(_client),
382 prev_tag(0.0, 0.0, 0.0, TimeZero),
383 info(_info),
384 idle(true),
385 last_tick(current_tick),
386 cur_rho(1),
387 cur_delta(1)
388 {
389 // empty
390 }
391
392 inline const RequestTag& get_req_tag() const {
393 return prev_tag;
394 }
395
396 static inline void assign_unpinned_tag(double& lhs, const double rhs) {
397 if (rhs != max_tag && rhs != min_tag) {
398 lhs = rhs;
399 }
400 }
401
402 inline void update_req_tag(const RequestTag& _prev,
403 const Counter& _tick) {
404 assign_unpinned_tag(prev_tag.reservation, _prev.reservation);
405 assign_unpinned_tag(prev_tag.limit, _prev.limit);
406 assign_unpinned_tag(prev_tag.proportion, _prev.proportion);
11fdf7f2 407 prev_tag.arrival = _prev.arrival;
7c673cae
FG
408 last_tick = _tick;
409 }
410
11fdf7f2
TL
411 inline void add_request(const RequestTag& tag, RequestRef&& request) {
412 requests.emplace_back(tag, client, std::move(request));
7c673cae
FG
413 }
414
415 inline const ClientReq& next_request() const {
416 return requests.front();
417 }
418
419 inline ClientReq& next_request() {
420 return requests.front();
421 }
422
423 inline void pop_request() {
424 requests.pop_front();
425 }
426
427 inline bool has_request() const {
428 return !requests.empty();
429 }
430
431 inline size_t request_count() const {
432 return requests.size();
433 }
434
435 // NB: because a deque is the underlying structure, this
436 // operation might be expensive
11fdf7f2 437 bool remove_by_req_filter_fw(std::function<bool(RequestRef&&)> filter_accum) {
7c673cae
FG
438 bool any_removed = false;
439 for (auto i = requests.begin();
440 i != requests.end();
441 /* no inc */) {
11fdf7f2 442 if (filter_accum(std::move(i->request))) {
7c673cae
FG
443 any_removed = true;
444 i = requests.erase(i);
445 } else {
446 ++i;
447 }
448 }
449 return any_removed;
450 }
451
452 // NB: because a deque is the underlying structure, this
453 // operation might be expensive
11fdf7f2 454 bool remove_by_req_filter_bw(std::function<bool(RequestRef&&)> filter_accum) {
7c673cae
FG
455 bool any_removed = false;
456 for (auto i = requests.rbegin();
457 i != requests.rend();
458 /* no inc */) {
11fdf7f2 459 if (filter_accum(std::move(i->request))) {
7c673cae
FG
460 any_removed = true;
461 i = decltype(i){ requests.erase(std::next(i).base()) };
462 } else {
463 ++i;
464 }
465 }
466 return any_removed;
467 }
468
469 inline bool
11fdf7f2 470 remove_by_req_filter(std::function<bool(RequestRef&&)> filter_accum,
7c673cae
FG
471 bool visit_backwards) {
472 if (visit_backwards) {
473 return remove_by_req_filter_bw(filter_accum);
474 } else {
475 return remove_by_req_filter_fw(filter_accum);
476 }
477 }
478
479 friend std::ostream&
480 operator<<(std::ostream& out,
11fdf7f2 481 const typename PriorityQueueBase::ClientRec& e) {
7c673cae
FG
482 out << "{ ClientRec::" <<
483 " client:" << e.client <<
484 " prev_tag:" << e.prev_tag <<
485 " req_count:" << e.requests.size() <<
486 " top_req:";
487 if (e.has_request()) {
488 out << e.next_request();
489 } else {
490 out << "none";
491 }
492 out << " }";
493
494 return out;
495 }
496 }; // class ClientRec
497
498 using ClientRecRef = std::shared_ptr<ClientRec>;
499
500 // when we try to get the next request, we'll be in one of three
501 // situations -- we'll have one to return, have one that can
502 // fire in the future, or not have any
503 enum class NextReqType { returning, future, none };
504
505 // specifies which queue next request will get popped from
506 enum class HeapId { reservation, ready };
507
508 // this is returned from next_req to tell the caller the situation
509 struct NextReq {
510 NextReqType type;
511 union {
512 HeapId heap_id;
513 Time when_ready;
514 };
11fdf7f2
TL
515
516 inline explicit NextReq() :
517 type(NextReqType::none)
518 { }
519
520 inline NextReq(HeapId _heap_id) :
521 type(NextReqType::returning),
522 heap_id(_heap_id)
523 { }
524
525 inline NextReq(Time _when_ready) :
526 type(NextReqType::future),
527 when_ready(_when_ready)
528 { }
529
530 // calls to this are clearer than calls to the default
531 // constructor
532 static inline NextReq none() {
533 return NextReq();
534 }
7c673cae
FG
535 };
536
537
538 // a function that can be called to look up client information
11fdf7f2 539 using ClientInfoFunc = std::function<const ClientInfo*(const C&)>;
7c673cae
FG
540
541
542 bool empty() const {
543 DataGuard g(data_mtx);
544 return (resv_heap.empty() || ! resv_heap.top().has_request());
545 }
546
547
548 size_t client_count() const {
549 DataGuard g(data_mtx);
550 return resv_heap.size();
551 }
552
553
554 size_t request_count() const {
555 DataGuard g(data_mtx);
556 size_t total = 0;
557 for (auto i = resv_heap.cbegin(); i != resv_heap.cend(); ++i) {
558 total += i->request_count();
559 }
560 return total;
561 }
562
563
11fdf7f2 564 bool remove_by_req_filter(std::function<bool(RequestRef&&)> filter_accum,
7c673cae
FG
565 bool visit_backwards = false) {
566 bool any_removed = false;
567 DataGuard g(data_mtx);
568 for (auto i : client_map) {
569 bool modified =
570 i.second->remove_by_req_filter(filter_accum, visit_backwards);
571 if (modified) {
572 resv_heap.adjust(*i.second);
573 limit_heap.adjust(*i.second);
574 ready_heap.adjust(*i.second);
575#if USE_PROP_HEAP
576 prop_heap.adjust(*i.second);
577#endif
578 any_removed = true;
579 }
580 }
581 return any_removed;
582 }
583
584
585 // use as a default value when no accumulator is provide
11fdf7f2 586 static void request_sink(RequestRef&& req) {
7c673cae
FG
587 // do nothing
588 }
589
590
591 void remove_by_client(const C& client,
592 bool reverse = false,
11fdf7f2 593 std::function<void (RequestRef&&)> accum = request_sink) {
7c673cae
FG
594 DataGuard g(data_mtx);
595
596 auto i = client_map.find(client);
597
598 if (i == client_map.end()) return;
599
600 if (reverse) {
601 for (auto j = i->second->requests.rbegin();
602 j != i->second->requests.rend();
603 ++j) {
11fdf7f2 604 accum(std::move(j->request));
7c673cae
FG
605 }
606 } else {
607 for (auto j = i->second->requests.begin();
608 j != i->second->requests.end();
609 ++j) {
11fdf7f2 610 accum(std::move(j->request));
7c673cae
FG
611 }
612 }
613
614 i->second->requests.clear();
615
616 resv_heap.adjust(*i->second);
617 limit_heap.adjust(*i->second);
618 ready_heap.adjust(*i->second);
619#if USE_PROP_HEAP
620 prop_heap.adjust(*i->second);
621#endif
622 }
623
624
625 uint get_heap_branching_factor() const {
626 return B;
627 }
628
629
11fdf7f2
TL
630 void update_client_info(const C& client_id) {
631 DataGuard g(data_mtx);
632 auto client_it = client_map.find(client_id);
633 if (client_map.end() != client_it) {
634 ClientRec& client = (*client_it->second);
635 client.info = client_info_f(client_id);
636 }
637 }
638
639
640 void update_client_infos() {
641 DataGuard g(data_mtx);
642 for (auto i : client_map) {
643 i.second->info = client_info_f(i.second->client);
644 }
645 }
646
647
7c673cae
FG
648 friend std::ostream& operator<<(std::ostream& out,
649 const PriorityQueueBase& q) {
650 std::lock_guard<decltype(q.data_mtx)> guard(q.data_mtx);
651
652 out << "{ PriorityQueue::";
653 for (const auto& c : q.client_map) {
654 out << " { client:" << c.first << ", record:" << *c.second <<
655 " }";
656 }
657 if (!q.resv_heap.empty()) {
658 const auto& resv = q.resv_heap.top();
659 out << " { reservation_top:" << resv << " }";
660 const auto& ready = q.ready_heap.top();
661 out << " { ready_top:" << ready << " }";
662 const auto& limit = q.limit_heap.top();
663 out << " { limit_top:" << limit << " }";
664 } else {
665 out << " HEAPS-EMPTY";
666 }
667 out << " }";
668
669 return out;
670 }
671
672 // for debugging
673 void display_queues(std::ostream& out,
674 bool show_res = true,
675 bool show_lim = true,
676 bool show_ready = true,
677 bool show_prop = true) const {
678 auto filter = [](const ClientRec& e)->bool { return true; };
679 DataGuard g(data_mtx);
680 if (show_res) {
681 resv_heap.display_sorted(out << "RESER:", filter);
682 }
683 if (show_lim) {
684 limit_heap.display_sorted(out << "LIMIT:", filter);
685 }
686 if (show_ready) {
687 ready_heap.display_sorted(out << "READY:", filter);
688 }
689#if USE_PROP_HEAP
690 if (show_prop) {
691 prop_heap.display_sorted(out << "PROPO:", filter);
692 }
693#endif
694 } // display_queues
695
696
697 protected:
698
699 // The ClientCompare functor is essentially doing a precedes?
700 // operator, returning true if and only if the first parameter
701 // must precede the second parameter. If the second must precede
702 // the first, or if they are equivalent, false should be
703 // returned. The reason for this behavior is that it will be
704 // called to test if two items are out of order and if true is
705 // returned it will reverse the items. Therefore false is the
706 // default return when it doesn't matter to prevent unnecessary
707 // re-ordering.
708 //
709 // The template is supporting variations in sorting based on the
710 // heap in question and allowing these variations to be handled
711 // at compile-time.
712 //
713 // tag_field determines which tag is being used for comparison
714 //
715 // ready_opt determines how the ready flag influences the sort
716 //
717 // use_prop_delta determines whether the proportional delta is
718 // added in for comparison
719 template<double RequestTag::*tag_field,
720 ReadyOption ready_opt,
721 bool use_prop_delta>
722 struct ClientCompare {
723 bool operator()(const ClientRec& n1, const ClientRec& n2) const {
724 if (n1.has_request()) {
725 if (n2.has_request()) {
726 const auto& t1 = n1.next_request().tag;
727 const auto& t2 = n2.next_request().tag;
728 if (ReadyOption::ignore == ready_opt || t1.ready == t2.ready) {
729 // if we don't care about ready or the ready values are the same
730 if (use_prop_delta) {
731 return (t1.*tag_field + n1.prop_delta) <
732 (t2.*tag_field + n2.prop_delta);
733 } else {
734 return t1.*tag_field < t2.*tag_field;
735 }
736 } else if (ReadyOption::raises == ready_opt) {
737 // use_ready == true && the ready fields are different
738 return t1.ready;
739 } else {
740 return t2.ready;
741 }
742 } else {
743 // n1 has request but n2 does not
744 return true;
745 }
746 } else if (n2.has_request()) {
747 // n2 has request but n1 does not
748 return false;
749 } else {
750 // both have none; keep stable w false
751 return false;
752 }
753 }
754 };
755
11fdf7f2
TL
756 ClientInfoFunc client_info_f;
757 static constexpr bool is_dynamic_cli_info_f = U1;
7c673cae
FG
758
759 mutable std::mutex data_mtx;
760 using DataGuard = std::lock_guard<decltype(data_mtx)>;
761
762 // stable mapping between client ids and client queues
763 std::map<C,ClientRecRef> client_map;
764
765 c::IndIntruHeap<ClientRecRef,
766 ClientRec,
767 &ClientRec::reserv_heap_data,
768 ClientCompare<&RequestTag::reservation,
769 ReadyOption::ignore,
770 false>,
771 B> resv_heap;
772#if USE_PROP_HEAP
773 c::IndIntruHeap<ClientRecRef,
774 ClientRec,
775 &ClientRec::prop_heap_data,
776 ClientCompare<&RequestTag::proportion,
777 ReadyOption::ignore,
778 true>,
779 B> prop_heap;
780#endif
781 c::IndIntruHeap<ClientRecRef,
782 ClientRec,
783 &ClientRec::lim_heap_data,
784 ClientCompare<&RequestTag::limit,
785 ReadyOption::lowers,
786 false>,
787 B> limit_heap;
788 c::IndIntruHeap<ClientRecRef,
789 ClientRec,
790 &ClientRec::ready_heap_data,
791 ClientCompare<&RequestTag::proportion,
792 ReadyOption::raises,
793 true>,
794 B> ready_heap;
795
11fdf7f2
TL
796 AtLimit at_limit;
797 RejectThreshold reject_threshold = 0;
798
799 double anticipation_timeout;
7c673cae
FG
800
801 std::atomic_bool finishing;
802
803 // every request creates a tick
804 Counter tick = 0;
805
806 // performance data collection
807 size_t reserv_sched_count = 0;
808 size_t prop_sched_count = 0;
809 size_t limit_break_sched_count = 0;
810
811 Duration idle_age;
812 Duration erase_age;
813 Duration check_time;
814 std::deque<MarkPoint> clean_mark_points;
11fdf7f2
TL
815 // max number of clients to erase at a time
816 Counter erase_max;
817 // unfinished last erase point
818 Counter last_erase_point = 0;
7c673cae
FG
819
820 // NB: All threads declared at end, so they're destructed first!
821
822 std::unique_ptr<RunEvery> cleaning_job;
823
11fdf7f2
TL
824 // helper function to return the value of a variant if it matches the
825 // given type T, or a default value of T otherwise
826 template <typename T, typename Variant>
827 static T get_or_default(const Variant& param, T default_value) {
828 const T *p = boost::get<T>(&param);
829 return p ? *p : default_value;
830 }
7c673cae
FG
831
832 // COMMON constructor that others feed into; we can accept three
833 // different variations of durations
834 template<typename Rep, typename Per>
835 PriorityQueueBase(ClientInfoFunc _client_info_f,
836 std::chrono::duration<Rep,Per> _idle_age,
837 std::chrono::duration<Rep,Per> _erase_age,
838 std::chrono::duration<Rep,Per> _check_time,
11fdf7f2
TL
839 AtLimitParam at_limit_param,
840 double _anticipation_timeout) :
7c673cae 841 client_info_f(_client_info_f),
11fdf7f2
TL
842 at_limit(get_or_default(at_limit_param, AtLimit::Reject)),
843 reject_threshold(get_or_default(at_limit_param, RejectThreshold{0})),
844 anticipation_timeout(_anticipation_timeout),
7c673cae
FG
845 finishing(false),
846 idle_age(std::chrono::duration_cast<Duration>(_idle_age)),
847 erase_age(std::chrono::duration_cast<Duration>(_erase_age)),
11fdf7f2
TL
848 check_time(std::chrono::duration_cast<Duration>(_check_time)),
849 erase_max(standard_erase_max)
7c673cae
FG
850 {
851 assert(_erase_age >= _idle_age);
852 assert(_check_time < _idle_age);
11fdf7f2
TL
853 // AtLimit::Reject depends on ImmediateTagCalc
854 assert(at_limit != AtLimit::Reject || !IsDelayed);
7c673cae
FG
855 cleaning_job =
856 std::unique_ptr<RunEvery>(
857 new RunEvery(check_time,
858 std::bind(&PriorityQueueBase::do_clean, this)));
859 }
860
861
862 ~PriorityQueueBase() {
863 finishing = true;
864 }
865
866
11fdf7f2
TL
867 inline const ClientInfo* get_cli_info(ClientRec& client) const {
868 if (is_dynamic_cli_info_f) {
869 client.info = client_info_f(client.client);
870 }
871 return client.info;
872 }
873
7c673cae 874 // data_mtx must be held by caller
11fdf7f2
TL
875 RequestTag initial_tag(DelayedTagCalc delayed, ClientRec& client,
876 const ReqParams& params, Time time, Cost cost) {
877 RequestTag tag(0, 0, 0, time, 0, 0, cost);
7c673cae 878
11fdf7f2
TL
879 // only calculate a tag if the request is going straight to the front
880 if (!client.has_request()) {
881 const ClientInfo* client_info = get_cli_info(client);
882 assert(client_info);
883 tag = RequestTag(client.get_req_tag(), *client_info,
884 params, time, cost, anticipation_timeout);
7c673cae 885
11fdf7f2
TL
886 // copy tag to previous tag for client
887 client.update_req_tag(tag, tick);
888 }
889 return tag;
890 }
891
892 // data_mtx must be held by caller
893 RequestTag initial_tag(ImmediateTagCalc imm, ClientRec& client,
894 const ReqParams& params, Time time, Cost cost) {
895 // calculate the tag unconditionally
896 const ClientInfo* client_info = get_cli_info(client);
897 assert(client_info);
898 RequestTag tag(client.get_req_tag(), *client_info,
899 params, time, cost, anticipation_timeout);
900
901 // copy tag to previous tag for client
902 client.update_req_tag(tag, tick);
903 return tag;
904 }
905
906 // data_mtx must be held by caller. returns 0 on success. when using
907 // AtLimit::Reject, requests that would exceed their limit are rejected
908 // with EAGAIN, and the queue will not take ownership of the given
909 // 'request' argument
910 int do_add_request(RequestRef&& request,
911 const C& client_id,
912 const ReqParams& req_params,
913 const Time time,
914 const Cost cost = 1u) {
915 ++tick;
916
917 auto insert = client_map.emplace(client_id, ClientRecRef{});
918 if (insert.second) {
919 // new client entry
920 const ClientInfo* info = client_info_f(client_id);
921 auto client_rec = std::make_shared<ClientRec>(client_id, info, tick);
7c673cae
FG
922 resv_heap.push(client_rec);
923#if USE_PROP_HEAP
924 prop_heap.push(client_rec);
925#endif
926 limit_heap.push(client_rec);
927 ready_heap.push(client_rec);
11fdf7f2 928 insert.first->second = std::move(client_rec);
7c673cae
FG
929 }
930
931 // for convenience, we'll create a reference to the shared pointer
11fdf7f2 932 ClientRec& client = *insert.first->second;
7c673cae
FG
933
934 if (client.idle) {
935 // We need to do an adjustment so that idle clients compete
936 // fairly on proportional tags since those tags may have
937 // drifted from real-time. Either use the lowest existing
938 // proportion tag -- O(1) -- or the client with the lowest
939 // previous proportion tag -- O(n) where n = # clients.
940 //
941 // So we don't have to maintain a propotional queue that
942 // keeps the minimum on proportional tag alone (we're
943 // instead using a ready queue), we'll have to check each
944 // client.
945 //
946 // The alternative would be to maintain a proportional queue
947 // (define USE_PROP_TAG) and do an O(1) operation here.
948
949 // Was unable to confirm whether equality testing on
950 // std::numeric_limits<double>::max() is guaranteed, so
951 // we'll use a compile-time calculated trigger that is one
952 // third the max, which should be much larger than any
953 // expected organic value.
954 constexpr double lowest_prop_tag_trigger =
955 std::numeric_limits<double>::max() / 3.0;
956
957 double lowest_prop_tag = std::numeric_limits<double>::max();
958 for (auto const &c : client_map) {
959 // don't use ourselves (or anything else that might be
960 // listed as idle) since we're now in the map
961 if (!c.second->idle) {
962 double p;
963 // use either lowest proportion tag or previous proportion tag
964 if (c.second->has_request()) {
965 p = c.second->next_request().tag.proportion +
966 c.second->prop_delta;
967 } else {
968 p = c.second->get_req_tag().proportion + c.second->prop_delta;
969 }
970
971 if (p < lowest_prop_tag) {
972 lowest_prop_tag = p;
973 }
974 }
975 }
976
977 // if this conditional does not fire, it
978 if (lowest_prop_tag < lowest_prop_tag_trigger) {
979 client.prop_delta = lowest_prop_tag - time;
980 }
981 client.idle = false;
982 } // if this client was idle
983
11fdf7f2 984 RequestTag tag = initial_tag(TagCalc{}, client, req_params, time, cost);
7c673cae 985
11fdf7f2
TL
986 if (at_limit == AtLimit::Reject &&
987 tag.limit > time + reject_threshold) {
988 // if the client is over its limit, reject it here
989 return EAGAIN;
7c673cae 990 }
7c673cae 991
11fdf7f2 992 client.add_request(tag, std::move(request));
7c673cae
FG
993 if (1 == client.requests.size()) {
994 // NB: can the following 4 calls to adjust be changed
995 // promote? Can adding a request ever demote a client in the
996 // heaps?
997 resv_heap.adjust(client);
998 limit_heap.adjust(client);
999 ready_heap.adjust(client);
1000#if USE_PROP_HEAP
1001 prop_heap.adjust(client);
1002#endif
1003 }
1004
1005 client.cur_rho = req_params.rho;
1006 client.cur_delta = req_params.delta;
1007
1008 resv_heap.adjust(client);
1009 limit_heap.adjust(client);
1010 ready_heap.adjust(client);
1011#if USE_PROP_HEAP
1012 prop_heap.adjust(client);
1013#endif
11fdf7f2
TL
1014 return 0;
1015 } // do_add_request
1016
1017 // data_mtx must be held by caller
1018 void update_next_tag(DelayedTagCalc delayed, ClientRec& top,
1019 const RequestTag& tag) {
1020 if (top.has_request()) {
1021 // perform delayed tag calculation on the next request
1022 ClientReq& next_first = top.next_request();
1023 const ClientInfo* client_info = get_cli_info(top);
1024 assert(client_info);
1025 next_first.tag = RequestTag(tag, *client_info,
1026 top.cur_delta, top.cur_rho,
1027 next_first.tag.arrival,
1028 next_first.tag.cost,
1029 anticipation_timeout);
1030 // copy tag to previous tag for client
1031 top.update_req_tag(next_first.tag, tick);
1032 }
1033 }
7c673cae 1034
11fdf7f2
TL
1035 void update_next_tag(ImmediateTagCalc imm, ClientRec& top,
1036 const RequestTag& tag) {
1037 // the next tag was already calculated on insertion
1038 }
7c673cae
FG
1039
1040 // data_mtx should be held when called; top of heap should have
1041 // a ready request
1042 template<typename C1, IndIntruHeapData ClientRec::*C2, typename C3>
11fdf7f2 1043 RequestTag pop_process_request(IndIntruHeap<C1, ClientRec, C2, C3, B>& heap,
7c673cae 1044 std::function<void(const C& client,
11fdf7f2 1045 const Cost cost,
7c673cae
FG
1046 RequestRef& request)> process) {
1047 // gain access to data
1048 ClientRec& top = heap.top();
31f18b77 1049
11fdf7f2 1050 Cost request_cost = top.next_request().tag.cost;
31f18b77
FG
1051 RequestRef request = std::move(top.next_request().request);
1052 RequestTag tag = top.next_request().tag;
7c673cae
FG
1053
1054 // pop request and adjust heaps
1055 top.pop_request();
1056
11fdf7f2 1057 update_next_tag(TagCalc{}, top, tag);
7c673cae
FG
1058
1059 resv_heap.demote(top);
1060 limit_heap.adjust(top);
1061#if USE_PROP_HEAP
1062 prop_heap.demote(top);
1063#endif
1064 ready_heap.demote(top);
1065
1066 // process
11fdf7f2
TL
1067 process(top.client, request_cost, request);
1068
1069 return tag;
7c673cae
FG
1070 } // pop_process_request
1071
1072
11fdf7f2
TL
1073 // data_mtx must be held by caller
1074 void reduce_reservation_tags(DelayedTagCalc delayed, ClientRec& client,
1075 const RequestTag& tag) {
1076 if (!client.requests.empty()) {
1077 // only maintain a tag for the first request
1078 auto& r = client.requests.front();
1079 r.tag.reservation -=
1080 client.info->reservation_inv * std::max(uint32_t(1), tag.rho);
1081 }
1082 }
1083
7c673cae 1084 // data_mtx should be held when called
11fdf7f2
TL
1085 void reduce_reservation_tags(ImmediateTagCalc imm, ClientRec& client,
1086 const RequestTag& tag) {
1087 double res_offset =
1088 client.info->reservation_inv * std::max(uint32_t(1), tag.rho);
7c673cae 1089 for (auto& r : client.requests) {
11fdf7f2 1090 r.tag.reservation -= res_offset;
7c673cae 1091 }
7c673cae
FG
1092 }
1093
7c673cae 1094 // data_mtx should be held when called
11fdf7f2 1095 void reduce_reservation_tags(const C& client_id, const RequestTag& tag) {
7c673cae
FG
1096 auto client_it = client_map.find(client_id);
1097
1098 // means the client was cleaned from map; should never happen
1099 // as long as cleaning times are long enough
1100 assert(client_map.end() != client_it);
11fdf7f2
TL
1101 ClientRec& client = *client_it->second;
1102 reduce_reservation_tags(TagCalc{}, client, tag);
1103
1104 // don't forget to update previous tag
1105 client.prev_tag.reservation -=
1106 client.info->reservation_inv * std::max(uint32_t(1), tag.rho);
1107 resv_heap.promote(client);
7c673cae
FG
1108 }
1109
1110
1111 // data_mtx should be held when called
1112 NextReq do_next_request(Time now) {
11fdf7f2
TL
1113 // if reservation queue is empty, all are empty (i.e., no
1114 // active clients)
7c673cae 1115 if(resv_heap.empty()) {
11fdf7f2 1116 return NextReq::none();
7c673cae
FG
1117 }
1118
1119 // try constraint (reservation) based scheduling
1120
1121 auto& reserv = resv_heap.top();
1122 if (reserv.has_request() &&
1123 reserv.next_request().tag.reservation <= now) {
11fdf7f2 1124 return NextReq(HeapId::reservation);
7c673cae
FG
1125 }
1126
1127 // no existing reservations before now, so try weight-based
1128 // scheduling
1129
1130 // all items that are within limit are eligible based on
1131 // priority
1132 auto limits = &limit_heap.top();
1133 while (limits->has_request() &&
1134 !limits->next_request().tag.ready &&
1135 limits->next_request().tag.limit <= now) {
1136 limits->next_request().tag.ready = true;
1137 ready_heap.promote(*limits);
1138 limit_heap.demote(*limits);
1139
1140 limits = &limit_heap.top();
1141 }
1142
1143 auto& readys = ready_heap.top();
1144 if (readys.has_request() &&
1145 readys.next_request().tag.ready &&
1146 readys.next_request().tag.proportion < max_tag) {
11fdf7f2 1147 return NextReq(HeapId::ready);
7c673cae
FG
1148 }
1149
1150 // if nothing is schedulable by reservation or
1151 // proportion/weight, and if we allow limit break, try to
1152 // schedule something with the lowest proportion tag or
1153 // alternatively lowest reservation tag.
11fdf7f2 1154 if (at_limit == AtLimit::Allow) {
7c673cae
FG
1155 if (readys.has_request() &&
1156 readys.next_request().tag.proportion < max_tag) {
11fdf7f2 1157 return NextReq(HeapId::ready);
7c673cae
FG
1158 } else if (reserv.has_request() &&
1159 reserv.next_request().tag.reservation < max_tag) {
11fdf7f2 1160 return NextReq(HeapId::reservation);
7c673cae
FG
1161 }
1162 }
1163
1164 // nothing scheduled; make sure we re-run when next
1165 // reservation item or next limited item comes up
1166
1167 Time next_call = TimeMax;
1168 if (resv_heap.top().has_request()) {
1169 next_call =
1170 min_not_0_time(next_call,
1171 resv_heap.top().next_request().tag.reservation);
1172 }
1173 if (limit_heap.top().has_request()) {
1174 const auto& next = limit_heap.top().next_request();
1175 assert(!next.tag.ready || max_tag == next.tag.proportion);
1176 next_call = min_not_0_time(next_call, next.tag.limit);
1177 }
1178 if (next_call < TimeMax) {
11fdf7f2 1179 return NextReq(next_call);
7c673cae 1180 } else {
11fdf7f2 1181 return NextReq::none();
7c673cae
FG
1182 }
1183 } // do_next_request
1184
1185
1186 // if possible is not zero and less than current then return it;
1187 // otherwise return current; the idea is we're trying to find
1188 // the minimal time but ignoring zero
1189 static inline const Time& min_not_0_time(const Time& current,
1190 const Time& possible) {
1191 return TimeZero == possible ? current : std::min(current, possible);
1192 }
1193
1194
1195 /*
1196 * This is being called regularly by RunEvery. Every time it's
1197 * called it notes the time and delta counter (mark point) in a
1198 * deque. It also looks at the deque to find the most recent
1199 * mark point that is older than clean_age. It then walks the
1200 * map and delete all server entries that were last used before
1201 * that mark point.
1202 */
1203 void do_clean() {
1204 TimePoint now = std::chrono::steady_clock::now();
1205 DataGuard g(data_mtx);
1206 clean_mark_points.emplace_back(MarkPoint(now, tick));
1207
1208 // first erase the super-old client records
1209
11fdf7f2 1210 Counter erase_point = last_erase_point;
7c673cae
FG
1211 auto point = clean_mark_points.front();
1212 while (point.first <= now - erase_age) {
11fdf7f2
TL
1213 last_erase_point = point.second;
1214 erase_point = last_erase_point;
7c673cae
FG
1215 clean_mark_points.pop_front();
1216 point = clean_mark_points.front();
1217 }
1218
1219 Counter idle_point = 0;
1220 for (auto i : clean_mark_points) {
1221 if (i.first <= now - idle_age) {
1222 idle_point = i.second;
1223 } else {
1224 break;
1225 }
1226 }
1227
11fdf7f2 1228 Counter erased_num = 0;
7c673cae
FG
1229 if (erase_point > 0 || idle_point > 0) {
1230 for (auto i = client_map.begin(); i != client_map.end(); /* empty */) {
1231 auto i2 = i++;
11fdf7f2
TL
1232 if (erase_point &&
1233 erased_num < erase_max &&
1234 i2->second->last_tick <= erase_point) {
7c673cae
FG
1235 delete_from_heaps(i2->second);
1236 client_map.erase(i2);
11fdf7f2 1237 erased_num++;
7c673cae
FG
1238 } else if (idle_point && i2->second->last_tick <= idle_point) {
1239 i2->second->idle = true;
1240 }
1241 } // for
11fdf7f2
TL
1242
1243 auto wperiod = check_time;
1244 if (erased_num >= erase_max) {
1245 wperiod = duration_cast<milliseconds>(aggressive_check_time);
1246 } else {
1247 // clean finished, refresh
1248 last_erase_point = 0;
1249 }
1250 cleaning_job->try_update(wperiod);
7c673cae
FG
1251 } // if
1252 } // do_clean
1253
1254
1255 // data_mtx must be held by caller
1256 template<IndIntruHeapData ClientRec::*C1,typename C2>
1257 void delete_from_heap(ClientRecRef& client,
1258 c::IndIntruHeap<ClientRecRef,ClientRec,C1,C2,B>& heap) {
11fdf7f2 1259 auto i = heap.at(client);
7c673cae
FG
1260 heap.remove(i);
1261 }
1262
1263
1264 // data_mtx must be held by caller
1265 void delete_from_heaps(ClientRecRef& client) {
1266 delete_from_heap(client, resv_heap);
1267#if USE_PROP_HEAP
1268 delete_from_heap(client, prop_heap);
1269#endif
1270 delete_from_heap(client, limit_heap);
1271 delete_from_heap(client, ready_heap);
1272 }
1273 }; // class PriorityQueueBase
1274
1275
11fdf7f2
TL
1276 template<typename C, typename R, bool IsDelayed=false, bool U1=false, uint B=2>
1277 class PullPriorityQueue : public PriorityQueueBase<C,R,IsDelayed,U1,B> {
1278 using super = PriorityQueueBase<C,R,IsDelayed,U1,B>;
7c673cae
FG
1279
1280 public:
1281
1282 // When a request is pulled, this is the return type.
1283 struct PullReq {
1284 struct Retn {
11fdf7f2
TL
1285 C client;
1286 typename super::RequestRef request;
1287 PhaseType phase;
1288 Cost cost;
7c673cae
FG
1289 };
1290
1291 typename super::NextReqType type;
1292 boost::variant<Retn,Time> data;
1293
1294 bool is_none() const { return type == super::NextReqType::none; }
1295
1296 bool is_retn() const { return type == super::NextReqType::returning; }
1297 Retn& get_retn() {
1298 return boost::get<Retn>(data);
1299 }
1300
1301 bool is_future() const { return type == super::NextReqType::future; }
1302 Time getTime() const { return boost::get<Time>(data); }
1303 };
1304
1305
1306#ifdef PROFILE
1307 ProfileTimer<std::chrono::nanoseconds> pull_request_timer;
1308 ProfileTimer<std::chrono::nanoseconds> add_request_timer;
1309#endif
1310
1311 template<typename Rep, typename Per>
1312 PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1313 std::chrono::duration<Rep,Per> _idle_age,
1314 std::chrono::duration<Rep,Per> _erase_age,
1315 std::chrono::duration<Rep,Per> _check_time,
11fdf7f2
TL
1316 AtLimitParam at_limit_param = AtLimit::Wait,
1317 double _anticipation_timeout = 0.0) :
7c673cae
FG
1318 super(_client_info_f,
1319 _idle_age, _erase_age, _check_time,
11fdf7f2 1320 at_limit_param, _anticipation_timeout)
7c673cae
FG
1321 {
1322 // empty
1323 }
1324
1325
1326 // pull convenience constructor
1327 PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
11fdf7f2
TL
1328 AtLimitParam at_limit_param = AtLimit::Wait,
1329 double _anticipation_timeout = 0.0) :
7c673cae 1330 PullPriorityQueue(_client_info_f,
11fdf7f2
TL
1331 standard_idle_age,
1332 standard_erase_age,
1333 standard_check_time,
1334 at_limit_param,
1335 _anticipation_timeout)
7c673cae
FG
1336 {
1337 // empty
1338 }
1339
1340
11fdf7f2
TL
1341 int add_request(R&& request,
1342 const C& client_id,
1343 const ReqParams& req_params,
1344 const Cost cost = 1u) {
1345 return add_request(typename super::RequestRef(new R(std::move(request))),
1346 client_id,
1347 req_params,
1348 get_time(),
1349 cost);
7c673cae
FG
1350 }
1351
1352
11fdf7f2
TL
1353 int add_request(R&& request,
1354 const C& client_id,
1355 const Cost cost = 1u) {
7c673cae 1356 static const ReqParams null_req_params;
11fdf7f2
TL
1357 return add_request(typename super::RequestRef(new R(std::move(request))),
1358 client_id,
1359 null_req_params,
1360 get_time(),
1361 cost);
7c673cae
FG
1362 }
1363
1364
11fdf7f2
TL
1365 int add_request_time(R&& request,
1366 const C& client_id,
1367 const ReqParams& req_params,
1368 const Time time,
1369 const Cost cost = 1u) {
1370 return add_request(typename super::RequestRef(new R(std::move(request))),
1371 client_id,
1372 req_params,
1373 time,
1374 cost);
7c673cae
FG
1375 }
1376
1377
11fdf7f2
TL
1378 int add_request(typename super::RequestRef&& request,
1379 const C& client_id,
1380 const ReqParams& req_params,
1381 const Cost cost = 1u) {
1382 return add_request(request, req_params, client_id, get_time(), cost);
7c673cae
FG
1383 }
1384
1385
11fdf7f2
TL
1386 int add_request(typename super::RequestRef&& request,
1387 const C& client_id,
1388 const Cost cost = 1u) {
7c673cae 1389 static const ReqParams null_req_params;
11fdf7f2 1390 return add_request(request, null_req_params, client_id, get_time(), cost);
7c673cae
FG
1391 }
1392
1393
1394 // this does the work; the versions above provide alternate interfaces
11fdf7f2
TL
1395 int add_request(typename super::RequestRef&& request,
1396 const C& client_id,
1397 const ReqParams& req_params,
1398 const Time time,
1399 const Cost cost = 1u) {
7c673cae
FG
1400 typename super::DataGuard g(this->data_mtx);
1401#ifdef PROFILE
1402 add_request_timer.start();
1403#endif
11fdf7f2
TL
1404 int r = super::do_add_request(std::move(request),
1405 client_id,
1406 req_params,
1407 time,
1408 cost);
7c673cae
FG
1409 // no call to schedule_request for pull version
1410#ifdef PROFILE
1411 add_request_timer.stop();
1412#endif
11fdf7f2 1413 return r;
7c673cae
FG
1414 }
1415
1416
1417 inline PullReq pull_request() {
1418 return pull_request(get_time());
1419 }
1420
1421
11fdf7f2 1422 PullReq pull_request(const Time now) {
7c673cae
FG
1423 PullReq result;
1424 typename super::DataGuard g(this->data_mtx);
1425#ifdef PROFILE
1426 pull_request_timer.start();
1427#endif
1428
1429 typename super::NextReq next = super::do_next_request(now);
1430 result.type = next.type;
1431 switch(next.type) {
1432 case super::NextReqType::none:
1433 return result;
7c673cae
FG
1434 case super::NextReqType::future:
1435 result.data = next.when_ready;
1436 return result;
7c673cae
FG
1437 case super::NextReqType::returning:
1438 // to avoid nesting, break out and let code below handle this case
1439 break;
1440 default:
1441 assert(false);
1442 }
1443
1444 // we'll only get here if we're returning an entry
1445
1446 auto process_f =
1447 [&] (PullReq& pull_result, PhaseType phase) ->
1448 std::function<void(const C&,
11fdf7f2 1449 uint64_t,
7c673cae
FG
1450 typename super::RequestRef&)> {
1451 return [&pull_result, phase](const C& client,
11fdf7f2 1452 const Cost request_cost,
7c673cae 1453 typename super::RequestRef& request) {
11fdf7f2
TL
1454 pull_result.data = typename PullReq::Retn{ client,
1455 std::move(request),
1456 phase,
1457 request_cost };
7c673cae
FG
1458 };
1459 };
1460
1461 switch(next.heap_id) {
1462 case super::HeapId::reservation:
11fdf7f2
TL
1463 (void) super::pop_process_request(this->resv_heap,
1464 process_f(result,
1465 PhaseType::reservation));
7c673cae
FG
1466 ++this->reserv_sched_count;
1467 break;
1468 case super::HeapId::ready:
11fdf7f2
TL
1469 {
1470 auto tag = super::pop_process_request(this->ready_heap,
7c673cae 1471 process_f(result, PhaseType::priority));
11fdf7f2 1472 // need to use retn temporarily
7c673cae 1473 auto& retn = boost::get<typename PullReq::Retn>(result.data);
11fdf7f2 1474 super::reduce_reservation_tags(retn.client, tag);
7c673cae
FG
1475 }
1476 ++this->prop_sched_count;
1477 break;
1478 default:
1479 assert(false);
1480 }
1481
1482#ifdef PROFILE
1483 pull_request_timer.stop();
1484#endif
1485 return result;
1486 } // pull_request
1487
1488
1489 protected:
1490
1491
1492 // data_mtx should be held when called; unfortunately this
1493 // function has to be repeated in both push & pull
1494 // specializations
1495 typename super::NextReq next_request() {
1496 return next_request(get_time());
1497 }
1498 }; // class PullPriorityQueue
1499
1500
1501 // PUSH version
11fdf7f2
TL
1502 template<typename C, typename R, bool IsDelayed=false, bool U1=false, uint B=2>
1503 class PushPriorityQueue : public PriorityQueueBase<C,R,IsDelayed,U1,B> {
7c673cae
FG
1504
1505 protected:
1506
11fdf7f2 1507 using super = PriorityQueueBase<C,R,IsDelayed,U1,B>;
7c673cae
FG
1508
1509 public:
1510
1511 // a function to see whether the server can handle another request
1512 using CanHandleRequestFunc = std::function<bool(void)>;
1513
1514 // a function to submit a request to the server; the second
1515 // parameter is a callback when it's completed
1516 using HandleRequestFunc =
11fdf7f2 1517 std::function<void(const C&,typename super::RequestRef,PhaseType,uint64_t)>;
7c673cae
FG
1518
1519 protected:
1520
1521 CanHandleRequestFunc can_handle_f;
1522 HandleRequestFunc handle_f;
1523 // for handling timed scheduling
1524 std::mutex sched_ahead_mtx;
1525 std::condition_variable sched_ahead_cv;
1526 Time sched_ahead_when = TimeZero;
1527
1528#ifdef PROFILE
1529 public:
1530 ProfileTimer<std::chrono::nanoseconds> add_request_timer;
1531 ProfileTimer<std::chrono::nanoseconds> request_complete_timer;
1532 protected:
1533#endif
1534
1535 // NB: threads declared last, so constructed last and destructed first
1536
1537 std::thread sched_ahead_thd;
1538
1539 public:
1540
1541 // push full constructor
1542 template<typename Rep, typename Per>
1543 PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1544 CanHandleRequestFunc _can_handle_f,
1545 HandleRequestFunc _handle_f,
1546 std::chrono::duration<Rep,Per> _idle_age,
1547 std::chrono::duration<Rep,Per> _erase_age,
1548 std::chrono::duration<Rep,Per> _check_time,
11fdf7f2
TL
1549 AtLimitParam at_limit_param = AtLimit::Wait,
1550 double anticipation_timeout = 0.0) :
7c673cae
FG
1551 super(_client_info_f,
1552 _idle_age, _erase_age, _check_time,
11fdf7f2 1553 at_limit_param, anticipation_timeout)
7c673cae
FG
1554 {
1555 can_handle_f = _can_handle_f;
1556 handle_f = _handle_f;
1557 sched_ahead_thd = std::thread(&PushPriorityQueue::run_sched_ahead, this);
1558 }
1559
1560
1561 // push convenience constructor
1562 PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1563 CanHandleRequestFunc _can_handle_f,
1564 HandleRequestFunc _handle_f,
11fdf7f2
TL
1565 AtLimitParam at_limit_param = AtLimit::Wait,
1566 double _anticipation_timeout = 0.0) :
7c673cae
FG
1567 PushPriorityQueue(_client_info_f,
1568 _can_handle_f,
1569 _handle_f,
11fdf7f2
TL
1570 standard_idle_age,
1571 standard_erase_age,
1572 standard_check_time,
1573 at_limit_param,
1574 _anticipation_timeout)
7c673cae
FG
1575 {
1576 // empty
1577 }
1578
1579
1580 ~PushPriorityQueue() {
1581 this->finishing = true;
1582 sched_ahead_cv.notify_one();
1583 sched_ahead_thd.join();
1584 }
1585
1586 public:
1587
11fdf7f2
TL
1588 int add_request(R&& request,
1589 const C& client_id,
1590 const ReqParams& req_params,
1591 const Cost cost = 1u) {
1592 return add_request(typename super::RequestRef(new R(std::move(request))),
1593 client_id,
1594 req_params,
1595 get_time(),
1596 cost);
7c673cae
FG
1597 }
1598
1599
11fdf7f2
TL
1600 int add_request(typename super::RequestRef&& request,
1601 const C& client_id,
1602 const ReqParams& req_params,
1603 const Cost cost = 1u) {
1604 return add_request(request, req_params, client_id, get_time(), cost);
7c673cae
FG
1605 }
1606
1607
11fdf7f2
TL
1608 int add_request_time(const R& request,
1609 const C& client_id,
1610 const ReqParams& req_params,
1611 const Time time,
1612 const Cost cost = 1u) {
1613 return add_request(typename super::RequestRef(new R(request)),
1614 client_id,
1615 req_params,
1616 time,
1617 cost);
7c673cae
FG
1618 }
1619
1620
11fdf7f2
TL
1621 int add_request(typename super::RequestRef&& request,
1622 const C& client_id,
1623 const ReqParams& req_params,
1624 const Time time,
1625 const Cost cost = 1u) {
7c673cae
FG
1626 typename super::DataGuard g(this->data_mtx);
1627#ifdef PROFILE
1628 add_request_timer.start();
1629#endif
11fdf7f2
TL
1630 int r = super::do_add_request(std::move(request),
1631 client_id,
1632 req_params,
1633 time,
1634 cost);
1635 if (r == 0) {
1636 schedule_request();
1637 }
7c673cae
FG
1638#ifdef PROFILE
1639 add_request_timer.stop();
1640#endif
11fdf7f2 1641 return r;
7c673cae
FG
1642 }
1643
1644
1645 void request_completed() {
1646 typename super::DataGuard g(this->data_mtx);
1647#ifdef PROFILE
1648 request_complete_timer.start();
1649#endif
1650 schedule_request();
1651#ifdef PROFILE
1652 request_complete_timer.stop();
1653#endif
1654 }
1655
1656 protected:
1657
1658 // data_mtx should be held when called; furthermore, the heap
1659 // should not be empty and the top element of the heap should
1660 // not be already handled
1661 //
1662 // NOTE: the use of "super::ClientRec" in either the template
1663 // construct or as a parameter to submit_top_request generated
1664 // a compiler error in g++ 4.8.4, when ClientRec was
1665 // "protected" rather than "public". By g++ 6.3.1 this was not
1666 // an issue. But for backwards compatibility
1667 // PriorityQueueBase::ClientRec is public.
1668 template<typename C1,
1669 IndIntruHeapData super::ClientRec::*C2,
1670 typename C3,
1671 uint B4>
11fdf7f2
TL
1672 typename super::RequestMeta
1673 submit_top_request(IndIntruHeap<C1,typename super::ClientRec,C2,C3,B4>& heap,
1674 PhaseType phase) {
7c673cae 1675 C client_result;
11fdf7f2 1676 RequestTag tag = super::pop_process_request(heap,
7c673cae
FG
1677 [this, phase, &client_result]
1678 (const C& client,
11fdf7f2 1679 const Cost request_cost,
7c673cae
FG
1680 typename super::RequestRef& request) {
1681 client_result = client;
11fdf7f2 1682 handle_f(client, std::move(request), phase, request_cost);
7c673cae 1683 });
11fdf7f2
TL
1684 typename super::RequestMeta req(client_result, tag);
1685 return req;
7c673cae
FG
1686 }
1687
1688
1689 // data_mtx should be held when called
1690 void submit_request(typename super::HeapId heap_id) {
7c673cae
FG
1691 switch(heap_id) {
1692 case super::HeapId::reservation:
1693 // don't need to note client
1694 (void) submit_top_request(this->resv_heap, PhaseType::reservation);
1695 // unlike the other two cases, we do not reduce reservation
1696 // tags here
1697 ++this->reserv_sched_count;
1698 break;
1699 case super::HeapId::ready:
11fdf7f2
TL
1700 {
1701 auto req = submit_top_request(this->ready_heap, PhaseType::priority);
1702 super::reduce_reservation_tags(req.client_id, req.tag);
1703 }
7c673cae
FG
1704 ++this->prop_sched_count;
1705 break;
1706 default:
1707 assert(false);
1708 }
1709 } // submit_request
1710
1711
1712 // data_mtx should be held when called; unfortunately this
1713 // function has to be repeated in both push & pull
1714 // specializations
1715 typename super::NextReq next_request() {
1716 return next_request(get_time());
1717 }
1718
1719
1720 // data_mtx should be held when called; overrides member
1721 // function in base class to add check for whether a request can
1722 // be pushed to the server
1723 typename super::NextReq next_request(Time now) {
1724 if (!can_handle_f()) {
1725 typename super::NextReq result;
1726 result.type = super::NextReqType::none;
1727 return result;
1728 } else {
1729 return super::do_next_request(now);
1730 }
1731 } // next_request
1732
1733
1734 // data_mtx should be held when called
1735 void schedule_request() {
1736 typename super::NextReq next_req = next_request();
1737 switch (next_req.type) {
1738 case super::NextReqType::none:
1739 return;
1740 case super::NextReqType::future:
1741 sched_at(next_req.when_ready);
1742 break;
1743 case super::NextReqType::returning:
1744 submit_request(next_req.heap_id);
1745 break;
1746 default:
1747 assert(false);
1748 }
1749 }
1750
1751
1752 // this is the thread that handles running schedule_request at
1753 // future times when nothing can be scheduled immediately
1754 void run_sched_ahead() {
1755 std::unique_lock<std::mutex> l(sched_ahead_mtx);
1756
1757 while (!this->finishing) {
1758 if (TimeZero == sched_ahead_when) {
1759 sched_ahead_cv.wait(l);
1760 } else {
1761 Time now;
1762 while (!this->finishing && (now = get_time()) < sched_ahead_when) {
1763 long microseconds_l = long(1 + 1000000 * (sched_ahead_when - now));
1764 auto microseconds = std::chrono::microseconds(microseconds_l);
1765 sched_ahead_cv.wait_for(l, microseconds);
1766 }
1767 sched_ahead_when = TimeZero;
1768 if (this->finishing) return;
1769
1770 l.unlock();
1771 if (!this->finishing) {
1772 typename super::DataGuard g(this->data_mtx);
1773 schedule_request();
1774 }
1775 l.lock();
1776 }
1777 }
1778 }
1779
1780
1781 void sched_at(Time when) {
1782 std::lock_guard<std::mutex> l(sched_ahead_mtx);
31f18b77 1783 if (this->finishing) return;
7c673cae
FG
1784 if (TimeZero == sched_ahead_when || when < sched_ahead_when) {
1785 sched_ahead_when = when;
1786 sched_ahead_cv.notify_one();
1787 }
1788 }
1789 }; // class PushPriorityQueue
1790
1791 } // namespace dmclock
1792} // namespace crimson