]> git.proxmox.com Git - ceph.git/blame - ceph/src/dmclock/src/dmclock_server.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / dmclock / src / dmclock_server.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4/*
5 * Copyright (C) 2017 Red Hat Inc.
11fdf7f2
TL
6 *
7 * Author: J. Eric Ivancich <ivancich@redhat.com>
8 *
9 * This is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU Lesser General Public License version
11 * 2.1, as published by the Free Software Foundation. See file
12 * COPYING.
7c673cae
FG
13 */
14
15
16#pragma once
17
18/* COMPILATION OPTIONS
7c673cae
FG
19 *
20 * The prop_heap does not seem to be necessary. The only thing it
11fdf7f2 21 * would help with is quickly finding the minimum proportion/prioity
7c673cae
FG
22 * when an idle client became active. To have the code maintain the
23 * proportional heap, define USE_PROP_HEAP (i.e., compiler argument
24 * -DUSE_PROP_HEAP).
25 */
26
27#include <assert.h>
28
29#include <cmath>
30#include <memory>
31#include <map>
32#include <deque>
33#include <queue>
20effc67 34#ifndef WITH_SEASTAR
7c673cae
FG
35#include <atomic>
36#include <mutex>
37#include <condition_variable>
38#include <thread>
20effc67 39#endif
7c673cae
FG
40#include <iostream>
41#include <sstream>
42#include <limits>
43
44#include <boost/variant.hpp>
45
46#include "indirect_intrusive_heap.h"
9f95a23c 47#include "../support/src/run_every.h"
7c673cae
FG
48#include "dmclock_util.h"
49#include "dmclock_recs.h"
50
51#ifdef PROFILE
52#include "profile.h"
53#endif
54
7c673cae
FG
55
56namespace crimson {
57
58 namespace dmclock {
59
60 namespace c = crimson;
61
62 constexpr double max_tag = std::numeric_limits<double>::is_iec559 ?
63 std::numeric_limits<double>::infinity() :
64 std::numeric_limits<double>::max();
65 constexpr double min_tag = std::numeric_limits<double>::is_iec559 ?
66 -std::numeric_limits<double>::infinity() :
67 std::numeric_limits<double>::lowest();
9f95a23c 68 constexpr unsigned tag_modulo = 1000000;
7c673cae 69
11fdf7f2
TL
70 constexpr auto standard_idle_age = std::chrono::seconds(300);
71 constexpr auto standard_erase_age = std::chrono::seconds(600);
72 constexpr auto standard_check_time = std::chrono::seconds(60);
73 constexpr auto aggressive_check_time = std::chrono::seconds(5);
9f95a23c 74 constexpr unsigned standard_erase_max = 2000;
11fdf7f2
TL
75
76 enum class AtLimit {
77 // requests are delayed until the limit is restored
78 Wait,
79 // requests are allowed to exceed their limit, if all other reservations
80 // are met and below their limits
81 Allow,
82 // if an incoming request would exceed its limit, add_request() will
83 // reject it with EAGAIN instead of adding it to the queue. cannot be used
84 // with DelayedTagCalc, because add_request() needs an accurate tag
85 Reject,
86 };
87
88 // when AtLimit::Reject is used, only start rejecting requests once their
89 // limit is above this threshold. requests under this threshold are
90 // enqueued and processed like AtLimit::Wait
91 using RejectThreshold = Time;
92
93 // the AtLimit constructor parameter can either accept AtLimit or a value
94 // for RejectThreshold (which implies AtLimit::Reject)
95 using AtLimitParam = boost::variant<AtLimit, RejectThreshold>;
96
7c673cae 97 struct ClientInfo {
11fdf7f2
TL
98 double reservation; // minimum
99 double weight; // proportional
100 double limit; // maximum
7c673cae
FG
101
102 // multiplicative inverses of above, which we use in calculations
103 // and don't want to recalculate repeatedly
11fdf7f2
TL
104 double reservation_inv;
105 double weight_inv;
106 double limit_inv;
7c673cae
FG
107
108 // order parameters -- min, "normal", max
9f95a23c
TL
109 ClientInfo(double _reservation, double _weight, double _limit) {
110 update(_reservation, _weight, _limit);
111 }
112
113 inline void update(double _reservation, double _weight, double _limit) {
114 reservation = _reservation;
115 weight = _weight;
116 limit = _limit;
117 reservation_inv = (0.0 == reservation) ? 0.0 : 1.0 / reservation;
118 weight_inv = (0.0 == weight) ? 0.0 : 1.0 / weight;
119 limit_inv = (0.0 == limit) ? 0.0 : 1.0 / limit;
7c673cae 120 }
7c673cae
FG
121
122 friend std::ostream& operator<<(std::ostream& out,
123 const ClientInfo& client) {
124 out <<
125 "{ ClientInfo:: r:" << client.reservation <<
126 " w:" << std::fixed << client.weight <<
127 " l:" << std::fixed << client.limit <<
128 " 1/r:" << std::fixed << client.reservation_inv <<
129 " 1/w:" << std::fixed << client.weight_inv <<
130 " 1/l:" << std::fixed << client.limit_inv <<
131 " }";
132 return out;
133 }
134 }; // class ClientInfo
135
136
137 struct RequestTag {
11fdf7f2
TL
138 double reservation;
139 double proportion;
140 double limit;
141 uint32_t delta;
142 uint32_t rho;
143 Cost cost;
144 bool ready; // true when within limit
145 Time arrival;
7c673cae
FG
146
147 RequestTag(const RequestTag& prev_tag,
148 const ClientInfo& client,
11fdf7f2
TL
149 const uint32_t _delta,
150 const uint32_t _rho,
31f18b77 151 const Time time,
11fdf7f2
TL
152 const Cost _cost = 1u,
153 const double anticipation_timeout = 0.0) :
154 delta(_delta),
155 rho(_rho),
156 cost(_cost),
157 ready(false),
158 arrival(time)
7c673cae 159 {
11fdf7f2
TL
160 assert(cost > 0);
161 Time max_time = time;
162 if (time - anticipation_timeout < prev_tag.arrival)
163 max_time -= anticipation_timeout;
164
165 reservation = tag_calc(max_time,
166 prev_tag.reservation,
167 client.reservation_inv,
168 rho,
169 true,
170 cost);
171 proportion = tag_calc(max_time,
172 prev_tag.proportion,
173 client.weight_inv,
174 delta,
175 true,
176 cost);
177 limit = tag_calc(max_time,
178 prev_tag.limit,
179 client.limit_inv,
180 delta,
181 false,
182 cost);
183
7c673cae
FG
184 assert(reservation < max_tag || proportion < max_tag);
185 }
186
31f18b77
FG
187 RequestTag(const RequestTag& prev_tag,
188 const ClientInfo& client,
189 const ReqParams req_params,
190 const Time time,
11fdf7f2
TL
191 const Cost cost = 1u,
192 const double anticipation_timeout = 0.0) :
193 RequestTag(prev_tag, client, req_params.delta, req_params.rho, time,
194 cost, anticipation_timeout)
31f18b77
FG
195 { /* empty */ }
196
11fdf7f2
TL
197 RequestTag(const double _res, const double _prop, const double _lim,
198 const Time _arrival,
199 const uint32_t _delta = 0,
200 const uint32_t _rho = 0,
201 const Cost _cost = 1u) :
7c673cae
FG
202 reservation(_res),
203 proportion(_prop),
204 limit(_lim),
11fdf7f2
TL
205 delta(_delta),
206 rho(_rho),
207 cost(_cost),
208 ready(false),
209 arrival(_arrival)
7c673cae 210 {
11fdf7f2 211 assert(cost > 0);
7c673cae
FG
212 assert(reservation < max_tag || proportion < max_tag);
213 }
214
215 RequestTag(const RequestTag& other) :
216 reservation(other.reservation),
217 proportion(other.proportion),
218 limit(other.limit),
11fdf7f2
TL
219 delta(other.delta),
220 rho(other.rho),
221 cost(other.cost),
222 ready(other.ready),
223 arrival(other.arrival)
224 { /* empty */ }
7c673cae
FG
225
226 static std::string format_tag_change(double before, double after) {
227 if (before == after) {
228 return std::string("same");
229 } else {
230 std::stringstream ss;
231 ss << format_tag(before) << "=>" << format_tag(after);
232 return ss.str();
233 }
234 }
235
236 static std::string format_tag(double value) {
237 if (max_tag == value) {
238 return std::string("max");
239 } else if (min_tag == value) {
240 return std::string("min");
241 } else {
242 return format_time(value, tag_modulo);
243 }
244 }
245
246 private:
247
31f18b77 248 static double tag_calc(const Time time,
11fdf7f2
TL
249 const double prev,
250 const double increment,
251 const uint32_t dist_req_val,
252 const bool extreme_is_high,
253 const Cost cost) {
7c673cae
FG
254 if (0.0 == increment) {
255 return extreme_is_high ? max_tag : min_tag;
256 } else {
11fdf7f2
TL
257 // insure 64-bit arithmetic before conversion to double
258 double tag_increment = increment * (uint64_t(dist_req_val) + cost);
259 return std::max(time, prev + tag_increment);
7c673cae
FG
260 }
261 }
262
263 friend std::ostream& operator<<(std::ostream& out,
264 const RequestTag& tag) {
265 out <<
266 "{ RequestTag:: ready:" << (tag.ready ? "true" : "false") <<
267 " r:" << format_tag(tag.reservation) <<
268 " p:" << format_tag(tag.proportion) <<
269 " l:" << format_tag(tag.limit) <<
270#if 0 // try to resolve this to make sure Time is operator<<'able.
7c673cae 271 " arrival:" << tag.arrival <<
7c673cae
FG
272#endif
273 " }";
274 return out;
275 }
276 }; // class RequestTag
277
11fdf7f2
TL
278 // C is client identifier type, R is request type,
279 // IsDelayed controls whether tag calculation is delayed until the request
280 // reaches the front of its queue. This is an optimization over the
281 // originally published dmclock algorithm, allowing it to use the most
282 // recent values of rho and delta.
283 // U1 determines whether to use client information function dynamically,
284 // B is heap branching factor
9f95a23c 285 template<typename C, typename R, bool IsDelayed, bool U1, unsigned B>
7c673cae 286 class PriorityQueueBase {
d2e6a577
FG
287 // we don't want to include gtest.h just for FRIEND_TEST
288 friend class dmclock_server_client_idle_erase_Test;
20effc67 289 friend class dmclock_server_add_req_pushprio_queue_Test;
7c673cae 290
11fdf7f2
TL
291 // types used for tag dispatch to select between implementations
292 using TagCalc = std::integral_constant<bool, IsDelayed>;
293 using DelayedTagCalc = std::true_type;
294 using ImmediateTagCalc = std::false_type;
295
7c673cae
FG
296 public:
297
298 using RequestRef = std::unique_ptr<R>;
299
300 protected:
301
f67539c2
TL
302 using Clock = std::chrono::steady_clock;
303 using TimePoint = Clock::time_point;
7c673cae
FG
304 using Duration = std::chrono::milliseconds;
305 using MarkPoint = std::pair<TimePoint,Counter>;
306
307 enum class ReadyOption {ignore, lowers, raises};
308
309 // forward decl for friend decls
310 template<double RequestTag::*, ReadyOption, bool>
311 struct ClientCompare;
312
313 class ClientReq {
314 friend PriorityQueueBase;
315
316 RequestTag tag;
317 C client_id;
318 RequestRef request;
319
320 public:
321
322 ClientReq(const RequestTag& _tag,
323 const C& _client_id,
324 RequestRef&& _request) :
325 tag(_tag),
326 client_id(_client_id),
327 request(std::move(_request))
328 {
329 // empty
330 }
331
332 friend std::ostream& operator<<(std::ostream& out, const ClientReq& c) {
333 out << "{ ClientReq:: tag:" << c.tag << " client:" <<
334 c.client_id << " }";
335 return out;
336 }
337 }; // class ClientReq
338
11fdf7f2
TL
339 struct RequestMeta {
340 C client_id;
341 RequestTag tag;
342
343 RequestMeta(const C& _client_id, const RequestTag& _tag) :
344 client_id(_client_id),
345 tag(_tag)
346 {
347 // empty
348 }
349 };
350
7c673cae
FG
351 public:
352
353 // NOTE: ClientRec is in the "public" section for compatibility
354 // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1
355 // ClientRec could be "protected" with no issue. [See comments
356 // associated with function submit_top_request.]
357 class ClientRec {
11fdf7f2 358 friend PriorityQueueBase<C,R,IsDelayed,U1,B>;
7c673cae
FG
359
360 C client;
361 RequestTag prev_tag;
362 std::deque<ClientReq> requests;
363
364 // amount added from the proportion tag as a result of
365 // an idle client becoming unidle
366 double prop_delta = 0.0;
367
11fdf7f2
TL
368 c::IndIntruHeapData reserv_heap_data {};
369 c::IndIntruHeapData lim_heap_data {};
370 c::IndIntruHeapData ready_heap_data {};
7c673cae 371#if USE_PROP_HEAP
11fdf7f2 372 c::IndIntruHeapData prop_heap_data {};
7c673cae
FG
373#endif
374
375 public:
376
11fdf7f2 377 const ClientInfo* info;
7c673cae
FG
378 bool idle;
379 Counter last_tick;
380 uint32_t cur_rho;
381 uint32_t cur_delta;
382
383 ClientRec(C _client,
11fdf7f2 384 const ClientInfo* _info,
7c673cae
FG
385 Counter current_tick) :
386 client(_client),
387 prev_tag(0.0, 0.0, 0.0, TimeZero),
388 info(_info),
389 idle(true),
390 last_tick(current_tick),
391 cur_rho(1),
392 cur_delta(1)
393 {
394 // empty
395 }
396
397 inline const RequestTag& get_req_tag() const {
398 return prev_tag;
399 }
400
401 static inline void assign_unpinned_tag(double& lhs, const double rhs) {
402 if (rhs != max_tag && rhs != min_tag) {
403 lhs = rhs;
404 }
405 }
406
407 inline void update_req_tag(const RequestTag& _prev,
408 const Counter& _tick) {
409 assign_unpinned_tag(prev_tag.reservation, _prev.reservation);
410 assign_unpinned_tag(prev_tag.limit, _prev.limit);
411 assign_unpinned_tag(prev_tag.proportion, _prev.proportion);
11fdf7f2 412 prev_tag.arrival = _prev.arrival;
7c673cae
FG
413 last_tick = _tick;
414 }
415
11fdf7f2
TL
416 inline void add_request(const RequestTag& tag, RequestRef&& request) {
417 requests.emplace_back(tag, client, std::move(request));
7c673cae
FG
418 }
419
420 inline const ClientReq& next_request() const {
421 return requests.front();
422 }
423
424 inline ClientReq& next_request() {
425 return requests.front();
426 }
427
428 inline void pop_request() {
429 requests.pop_front();
430 }
431
432 inline bool has_request() const {
433 return !requests.empty();
434 }
435
436 inline size_t request_count() const {
437 return requests.size();
438 }
439
440 // NB: because a deque is the underlying structure, this
441 // operation might be expensive
11fdf7f2 442 bool remove_by_req_filter_fw(std::function<bool(RequestRef&&)> filter_accum) {
7c673cae
FG
443 bool any_removed = false;
444 for (auto i = requests.begin();
445 i != requests.end();
446 /* no inc */) {
11fdf7f2 447 if (filter_accum(std::move(i->request))) {
7c673cae
FG
448 any_removed = true;
449 i = requests.erase(i);
450 } else {
451 ++i;
452 }
453 }
454 return any_removed;
455 }
456
457 // NB: because a deque is the underlying structure, this
458 // operation might be expensive
11fdf7f2 459 bool remove_by_req_filter_bw(std::function<bool(RequestRef&&)> filter_accum) {
7c673cae
FG
460 bool any_removed = false;
461 for (auto i = requests.rbegin();
462 i != requests.rend();
463 /* no inc */) {
11fdf7f2 464 if (filter_accum(std::move(i->request))) {
7c673cae
FG
465 any_removed = true;
466 i = decltype(i){ requests.erase(std::next(i).base()) };
467 } else {
468 ++i;
469 }
470 }
471 return any_removed;
472 }
473
474 inline bool
11fdf7f2 475 remove_by_req_filter(std::function<bool(RequestRef&&)> filter_accum,
7c673cae
FG
476 bool visit_backwards) {
477 if (visit_backwards) {
478 return remove_by_req_filter_bw(filter_accum);
479 } else {
480 return remove_by_req_filter_fw(filter_accum);
481 }
482 }
483
484 friend std::ostream&
485 operator<<(std::ostream& out,
11fdf7f2 486 const typename PriorityQueueBase::ClientRec& e) {
7c673cae
FG
487 out << "{ ClientRec::" <<
488 " client:" << e.client <<
489 " prev_tag:" << e.prev_tag <<
490 " req_count:" << e.requests.size() <<
491 " top_req:";
492 if (e.has_request()) {
493 out << e.next_request();
494 } else {
495 out << "none";
496 }
497 out << " }";
498
499 return out;
500 }
501 }; // class ClientRec
502
503 using ClientRecRef = std::shared_ptr<ClientRec>;
504
505 // when we try to get the next request, we'll be in one of three
506 // situations -- we'll have one to return, have one that can
507 // fire in the future, or not have any
508 enum class NextReqType { returning, future, none };
509
510 // specifies which queue next request will get popped from
511 enum class HeapId { reservation, ready };
512
513 // this is returned from next_req to tell the caller the situation
514 struct NextReq {
515 NextReqType type;
516 union {
517 HeapId heap_id;
518 Time when_ready;
519 };
11fdf7f2
TL
520
521 inline explicit NextReq() :
522 type(NextReqType::none)
523 { }
524
525 inline NextReq(HeapId _heap_id) :
526 type(NextReqType::returning),
527 heap_id(_heap_id)
528 { }
529
530 inline NextReq(Time _when_ready) :
531 type(NextReqType::future),
532 when_ready(_when_ready)
533 { }
534
535 // calls to this are clearer than calls to the default
536 // constructor
537 static inline NextReq none() {
538 return NextReq();
539 }
7c673cae
FG
540 };
541
542
543 // a function that can be called to look up client information
11fdf7f2 544 using ClientInfoFunc = std::function<const ClientInfo*(const C&)>;
7c673cae
FG
545
546
547 bool empty() const {
548 DataGuard g(data_mtx);
549 return (resv_heap.empty() || ! resv_heap.top().has_request());
550 }
551
552
553 size_t client_count() const {
554 DataGuard g(data_mtx);
555 return resv_heap.size();
556 }
557
558
559 size_t request_count() const {
560 DataGuard g(data_mtx);
561 size_t total = 0;
562 for (auto i = resv_heap.cbegin(); i != resv_heap.cend(); ++i) {
563 total += i->request_count();
564 }
565 return total;
566 }
567
568
11fdf7f2 569 bool remove_by_req_filter(std::function<bool(RequestRef&&)> filter_accum,
7c673cae
FG
570 bool visit_backwards = false) {
571 bool any_removed = false;
572 DataGuard g(data_mtx);
573 for (auto i : client_map) {
574 bool modified =
575 i.second->remove_by_req_filter(filter_accum, visit_backwards);
576 if (modified) {
577 resv_heap.adjust(*i.second);
578 limit_heap.adjust(*i.second);
579 ready_heap.adjust(*i.second);
580#if USE_PROP_HEAP
581 prop_heap.adjust(*i.second);
582#endif
583 any_removed = true;
584 }
585 }
586 return any_removed;
587 }
588
589
590 // use as a default value when no accumulator is provide
11fdf7f2 591 static void request_sink(RequestRef&& req) {
7c673cae
FG
592 // do nothing
593 }
594
595
596 void remove_by_client(const C& client,
597 bool reverse = false,
11fdf7f2 598 std::function<void (RequestRef&&)> accum = request_sink) {
7c673cae
FG
599 DataGuard g(data_mtx);
600
601 auto i = client_map.find(client);
602
603 if (i == client_map.end()) return;
604
605 if (reverse) {
606 for (auto j = i->second->requests.rbegin();
607 j != i->second->requests.rend();
608 ++j) {
11fdf7f2 609 accum(std::move(j->request));
7c673cae
FG
610 }
611 } else {
612 for (auto j = i->second->requests.begin();
613 j != i->second->requests.end();
614 ++j) {
11fdf7f2 615 accum(std::move(j->request));
7c673cae
FG
616 }
617 }
618
619 i->second->requests.clear();
620
621 resv_heap.adjust(*i->second);
622 limit_heap.adjust(*i->second);
623 ready_heap.adjust(*i->second);
624#if USE_PROP_HEAP
625 prop_heap.adjust(*i->second);
626#endif
627 }
628
629
9f95a23c 630 unsigned get_heap_branching_factor() const {
7c673cae
FG
631 return B;
632 }
633
634
11fdf7f2
TL
635 void update_client_info(const C& client_id) {
636 DataGuard g(data_mtx);
637 auto client_it = client_map.find(client_id);
638 if (client_map.end() != client_it) {
639 ClientRec& client = (*client_it->second);
640 client.info = client_info_f(client_id);
641 }
642 }
643
644
645 void update_client_infos() {
646 DataGuard g(data_mtx);
647 for (auto i : client_map) {
648 i.second->info = client_info_f(i.second->client);
649 }
650 }
651
652
7c673cae
FG
653 friend std::ostream& operator<<(std::ostream& out,
654 const PriorityQueueBase& q) {
655 std::lock_guard<decltype(q.data_mtx)> guard(q.data_mtx);
656
657 out << "{ PriorityQueue::";
658 for (const auto& c : q.client_map) {
659 out << " { client:" << c.first << ", record:" << *c.second <<
660 " }";
661 }
662 if (!q.resv_heap.empty()) {
663 const auto& resv = q.resv_heap.top();
664 out << " { reservation_top:" << resv << " }";
665 const auto& ready = q.ready_heap.top();
666 out << " { ready_top:" << ready << " }";
667 const auto& limit = q.limit_heap.top();
668 out << " { limit_top:" << limit << " }";
669 } else {
670 out << " HEAPS-EMPTY";
671 }
672 out << " }";
673
674 return out;
675 }
676
677 // for debugging
678 void display_queues(std::ostream& out,
679 bool show_res = true,
680 bool show_lim = true,
681 bool show_ready = true,
682 bool show_prop = true) const {
683 auto filter = [](const ClientRec& e)->bool { return true; };
684 DataGuard g(data_mtx);
685 if (show_res) {
686 resv_heap.display_sorted(out << "RESER:", filter);
687 }
688 if (show_lim) {
689 limit_heap.display_sorted(out << "LIMIT:", filter);
690 }
691 if (show_ready) {
692 ready_heap.display_sorted(out << "READY:", filter);
693 }
694#if USE_PROP_HEAP
695 if (show_prop) {
696 prop_heap.display_sorted(out << "PROPO:", filter);
697 }
698#endif
699 } // display_queues
700
701
702 protected:
703
704 // The ClientCompare functor is essentially doing a precedes?
705 // operator, returning true if and only if the first parameter
706 // must precede the second parameter. If the second must precede
707 // the first, or if they are equivalent, false should be
708 // returned. The reason for this behavior is that it will be
709 // called to test if two items are out of order and if true is
710 // returned it will reverse the items. Therefore false is the
711 // default return when it doesn't matter to prevent unnecessary
712 // re-ordering.
713 //
714 // The template is supporting variations in sorting based on the
715 // heap in question and allowing these variations to be handled
716 // at compile-time.
717 //
718 // tag_field determines which tag is being used for comparison
719 //
720 // ready_opt determines how the ready flag influences the sort
721 //
722 // use_prop_delta determines whether the proportional delta is
723 // added in for comparison
724 template<double RequestTag::*tag_field,
725 ReadyOption ready_opt,
726 bool use_prop_delta>
727 struct ClientCompare {
728 bool operator()(const ClientRec& n1, const ClientRec& n2) const {
729 if (n1.has_request()) {
730 if (n2.has_request()) {
731 const auto& t1 = n1.next_request().tag;
732 const auto& t2 = n2.next_request().tag;
733 if (ReadyOption::ignore == ready_opt || t1.ready == t2.ready) {
734 // if we don't care about ready or the ready values are the same
735 if (use_prop_delta) {
736 return (t1.*tag_field + n1.prop_delta) <
737 (t2.*tag_field + n2.prop_delta);
738 } else {
739 return t1.*tag_field < t2.*tag_field;
740 }
741 } else if (ReadyOption::raises == ready_opt) {
742 // use_ready == true && the ready fields are different
743 return t1.ready;
744 } else {
745 return t2.ready;
746 }
747 } else {
748 // n1 has request but n2 does not
749 return true;
750 }
751 } else if (n2.has_request()) {
752 // n2 has request but n1 does not
753 return false;
754 } else {
755 // both have none; keep stable w false
756 return false;
757 }
758 }
759 };
760
11fdf7f2
TL
761 ClientInfoFunc client_info_f;
762 static constexpr bool is_dynamic_cli_info_f = U1;
7c673cae 763
20effc67
TL
764#ifdef WITH_SEASTAR
765 static constexpr int data_mtx = 0;
766 struct DataGuard { DataGuard(int) {} };
767#else
7c673cae
FG
768 mutable std::mutex data_mtx;
769 using DataGuard = std::lock_guard<decltype(data_mtx)>;
20effc67 770#endif
7c673cae
FG
771
772 // stable mapping between client ids and client queues
773 std::map<C,ClientRecRef> client_map;
774
775 c::IndIntruHeap<ClientRecRef,
776 ClientRec,
777 &ClientRec::reserv_heap_data,
778 ClientCompare<&RequestTag::reservation,
779 ReadyOption::ignore,
780 false>,
781 B> resv_heap;
782#if USE_PROP_HEAP
783 c::IndIntruHeap<ClientRecRef,
784 ClientRec,
785 &ClientRec::prop_heap_data,
786 ClientCompare<&RequestTag::proportion,
787 ReadyOption::ignore,
788 true>,
789 B> prop_heap;
790#endif
791 c::IndIntruHeap<ClientRecRef,
792 ClientRec,
793 &ClientRec::lim_heap_data,
794 ClientCompare<&RequestTag::limit,
795 ReadyOption::lowers,
796 false>,
797 B> limit_heap;
798 c::IndIntruHeap<ClientRecRef,
799 ClientRec,
800 &ClientRec::ready_heap_data,
801 ClientCompare<&RequestTag::proportion,
802 ReadyOption::raises,
803 true>,
804 B> ready_heap;
805
11fdf7f2
TL
806 AtLimit at_limit;
807 RejectThreshold reject_threshold = 0;
808
809 double anticipation_timeout;
20effc67
TL
810#ifdef WITH_SEASTAR
811 bool finishing;
812#else
7c673cae 813 std::atomic_bool finishing;
20effc67 814#endif
7c673cae
FG
815 // every request creates a tick
816 Counter tick = 0;
817
818 // performance data collection
819 size_t reserv_sched_count = 0;
820 size_t prop_sched_count = 0;
821 size_t limit_break_sched_count = 0;
822
823 Duration idle_age;
824 Duration erase_age;
825 Duration check_time;
826 std::deque<MarkPoint> clean_mark_points;
11fdf7f2
TL
827 // max number of clients to erase at a time
828 Counter erase_max;
829 // unfinished last erase point
830 Counter last_erase_point = 0;
7c673cae
FG
831
832 // NB: All threads declared at end, so they're destructed first!
833
834 std::unique_ptr<RunEvery> cleaning_job;
835
11fdf7f2
TL
836 // helper function to return the value of a variant if it matches the
837 // given type T, or a default value of T otherwise
838 template <typename T, typename Variant>
839 static T get_or_default(const Variant& param, T default_value) {
840 const T *p = boost::get<T>(&param);
841 return p ? *p : default_value;
842 }
7c673cae
FG
843
844 // COMMON constructor that others feed into; we can accept three
845 // different variations of durations
846 template<typename Rep, typename Per>
847 PriorityQueueBase(ClientInfoFunc _client_info_f,
848 std::chrono::duration<Rep,Per> _idle_age,
849 std::chrono::duration<Rep,Per> _erase_age,
850 std::chrono::duration<Rep,Per> _check_time,
11fdf7f2
TL
851 AtLimitParam at_limit_param,
852 double _anticipation_timeout) :
7c673cae 853 client_info_f(_client_info_f),
11fdf7f2
TL
854 at_limit(get_or_default(at_limit_param, AtLimit::Reject)),
855 reject_threshold(get_or_default(at_limit_param, RejectThreshold{0})),
856 anticipation_timeout(_anticipation_timeout),
7c673cae
FG
857 finishing(false),
858 idle_age(std::chrono::duration_cast<Duration>(_idle_age)),
859 erase_age(std::chrono::duration_cast<Duration>(_erase_age)),
11fdf7f2
TL
860 check_time(std::chrono::duration_cast<Duration>(_check_time)),
861 erase_max(standard_erase_max)
7c673cae
FG
862 {
863 assert(_erase_age >= _idle_age);
864 assert(_check_time < _idle_age);
11fdf7f2
TL
865 // AtLimit::Reject depends on ImmediateTagCalc
866 assert(at_limit != AtLimit::Reject || !IsDelayed);
7c673cae
FG
867 cleaning_job =
868 std::unique_ptr<RunEvery>(
869 new RunEvery(check_time,
870 std::bind(&PriorityQueueBase::do_clean, this)));
871 }
872
873
874 ~PriorityQueueBase() {
875 finishing = true;
876 }
877
878
11fdf7f2
TL
879 inline const ClientInfo* get_cli_info(ClientRec& client) const {
880 if (is_dynamic_cli_info_f) {
881 client.info = client_info_f(client.client);
882 }
883 return client.info;
884 }
885
7c673cae 886 // data_mtx must be held by caller
11fdf7f2
TL
887 RequestTag initial_tag(DelayedTagCalc delayed, ClientRec& client,
888 const ReqParams& params, Time time, Cost cost) {
889 RequestTag tag(0, 0, 0, time, 0, 0, cost);
7c673cae 890
11fdf7f2
TL
891 // only calculate a tag if the request is going straight to the front
892 if (!client.has_request()) {
893 const ClientInfo* client_info = get_cli_info(client);
894 assert(client_info);
895 tag = RequestTag(client.get_req_tag(), *client_info,
896 params, time, cost, anticipation_timeout);
7c673cae 897
11fdf7f2
TL
898 // copy tag to previous tag for client
899 client.update_req_tag(tag, tick);
900 }
901 return tag;
902 }
903
904 // data_mtx must be held by caller
905 RequestTag initial_tag(ImmediateTagCalc imm, ClientRec& client,
906 const ReqParams& params, Time time, Cost cost) {
907 // calculate the tag unconditionally
908 const ClientInfo* client_info = get_cli_info(client);
909 assert(client_info);
910 RequestTag tag(client.get_req_tag(), *client_info,
911 params, time, cost, anticipation_timeout);
912
913 // copy tag to previous tag for client
914 client.update_req_tag(tag, tick);
915 return tag;
916 }
917
918 // data_mtx must be held by caller. returns 0 on success. when using
919 // AtLimit::Reject, requests that would exceed their limit are rejected
920 // with EAGAIN, and the queue will not take ownership of the given
921 // 'request' argument
922 int do_add_request(RequestRef&& request,
923 const C& client_id,
924 const ReqParams& req_params,
925 const Time time,
926 const Cost cost = 1u) {
927 ++tick;
928
929 auto insert = client_map.emplace(client_id, ClientRecRef{});
930 if (insert.second) {
931 // new client entry
932 const ClientInfo* info = client_info_f(client_id);
933 auto client_rec = std::make_shared<ClientRec>(client_id, info, tick);
7c673cae
FG
934 resv_heap.push(client_rec);
935#if USE_PROP_HEAP
936 prop_heap.push(client_rec);
937#endif
938 limit_heap.push(client_rec);
939 ready_heap.push(client_rec);
11fdf7f2 940 insert.first->second = std::move(client_rec);
7c673cae
FG
941 }
942
943 // for convenience, we'll create a reference to the shared pointer
11fdf7f2 944 ClientRec& client = *insert.first->second;
7c673cae
FG
945
946 if (client.idle) {
947 // We need to do an adjustment so that idle clients compete
948 // fairly on proportional tags since those tags may have
949 // drifted from real-time. Either use the lowest existing
950 // proportion tag -- O(1) -- or the client with the lowest
951 // previous proportion tag -- O(n) where n = # clients.
952 //
f67539c2 953 // So we don't have to maintain a proportional queue that
7c673cae
FG
954 // keeps the minimum on proportional tag alone (we're
955 // instead using a ready queue), we'll have to check each
956 // client.
957 //
958 // The alternative would be to maintain a proportional queue
959 // (define USE_PROP_TAG) and do an O(1) operation here.
960
961 // Was unable to confirm whether equality testing on
962 // std::numeric_limits<double>::max() is guaranteed, so
963 // we'll use a compile-time calculated trigger that is one
964 // third the max, which should be much larger than any
965 // expected organic value.
966 constexpr double lowest_prop_tag_trigger =
967 std::numeric_limits<double>::max() / 3.0;
968
969 double lowest_prop_tag = std::numeric_limits<double>::max();
970 for (auto const &c : client_map) {
971 // don't use ourselves (or anything else that might be
972 // listed as idle) since we're now in the map
973 if (!c.second->idle) {
974 double p;
975 // use either lowest proportion tag or previous proportion tag
976 if (c.second->has_request()) {
977 p = c.second->next_request().tag.proportion +
978 c.second->prop_delta;
979 } else {
980 p = c.second->get_req_tag().proportion + c.second->prop_delta;
981 }
982
983 if (p < lowest_prop_tag) {
984 lowest_prop_tag = p;
985 }
986 }
987 }
988
989 // if this conditional does not fire, it
990 if (lowest_prop_tag < lowest_prop_tag_trigger) {
991 client.prop_delta = lowest_prop_tag - time;
992 }
993 client.idle = false;
994 } // if this client was idle
995
11fdf7f2 996 RequestTag tag = initial_tag(TagCalc{}, client, req_params, time, cost);
7c673cae 997
11fdf7f2
TL
998 if (at_limit == AtLimit::Reject &&
999 tag.limit > time + reject_threshold) {
1000 // if the client is over its limit, reject it here
1001 return EAGAIN;
7c673cae 1002 }
7c673cae 1003
11fdf7f2 1004 client.add_request(tag, std::move(request));
7c673cae
FG
1005 if (1 == client.requests.size()) {
1006 // NB: can the following 4 calls to adjust be changed
1007 // promote? Can adding a request ever demote a client in the
1008 // heaps?
1009 resv_heap.adjust(client);
1010 limit_heap.adjust(client);
1011 ready_heap.adjust(client);
1012#if USE_PROP_HEAP
1013 prop_heap.adjust(client);
1014#endif
1015 }
1016
1017 client.cur_rho = req_params.rho;
1018 client.cur_delta = req_params.delta;
1019
1020 resv_heap.adjust(client);
1021 limit_heap.adjust(client);
1022 ready_heap.adjust(client);
1023#if USE_PROP_HEAP
1024 prop_heap.adjust(client);
1025#endif
11fdf7f2
TL
1026 return 0;
1027 } // do_add_request
1028
1029 // data_mtx must be held by caller
1030 void update_next_tag(DelayedTagCalc delayed, ClientRec& top,
1031 const RequestTag& tag) {
1032 if (top.has_request()) {
1033 // perform delayed tag calculation on the next request
1034 ClientReq& next_first = top.next_request();
1035 const ClientInfo* client_info = get_cli_info(top);
1036 assert(client_info);
1037 next_first.tag = RequestTag(tag, *client_info,
1038 top.cur_delta, top.cur_rho,
1039 next_first.tag.arrival,
1040 next_first.tag.cost,
1041 anticipation_timeout);
1042 // copy tag to previous tag for client
1043 top.update_req_tag(next_first.tag, tick);
1044 }
1045 }
7c673cae 1046
11fdf7f2
TL
1047 void update_next_tag(ImmediateTagCalc imm, ClientRec& top,
1048 const RequestTag& tag) {
1049 // the next tag was already calculated on insertion
1050 }
7c673cae
FG
1051
1052 // data_mtx should be held when called; top of heap should have
1053 // a ready request
1054 template<typename C1, IndIntruHeapData ClientRec::*C2, typename C3>
11fdf7f2 1055 RequestTag pop_process_request(IndIntruHeap<C1, ClientRec, C2, C3, B>& heap,
7c673cae 1056 std::function<void(const C& client,
11fdf7f2 1057 const Cost cost,
7c673cae
FG
1058 RequestRef& request)> process) {
1059 // gain access to data
1060 ClientRec& top = heap.top();
31f18b77 1061
11fdf7f2 1062 Cost request_cost = top.next_request().tag.cost;
31f18b77
FG
1063 RequestRef request = std::move(top.next_request().request);
1064 RequestTag tag = top.next_request().tag;
7c673cae
FG
1065
1066 // pop request and adjust heaps
1067 top.pop_request();
1068
11fdf7f2 1069 update_next_tag(TagCalc{}, top, tag);
7c673cae
FG
1070
1071 resv_heap.demote(top);
1072 limit_heap.adjust(top);
1073#if USE_PROP_HEAP
1074 prop_heap.demote(top);
1075#endif
1076 ready_heap.demote(top);
1077
1078 // process
11fdf7f2
TL
1079 process(top.client, request_cost, request);
1080
1081 return tag;
7c673cae
FG
1082 } // pop_process_request
1083
1084
11fdf7f2
TL
1085 // data_mtx must be held by caller
1086 void reduce_reservation_tags(DelayedTagCalc delayed, ClientRec& client,
1087 const RequestTag& tag) {
1088 if (!client.requests.empty()) {
1089 // only maintain a tag for the first request
1090 auto& r = client.requests.front();
1091 r.tag.reservation -=
20effc67 1092 client.info->reservation_inv * (tag.cost + tag.rho);
11fdf7f2
TL
1093 }
1094 }
1095
7c673cae 1096 // data_mtx should be held when called
11fdf7f2
TL
1097 void reduce_reservation_tags(ImmediateTagCalc imm, ClientRec& client,
1098 const RequestTag& tag) {
1099 double res_offset =
20effc67 1100 client.info->reservation_inv * (tag.cost + tag.rho);
7c673cae 1101 for (auto& r : client.requests) {
11fdf7f2 1102 r.tag.reservation -= res_offset;
7c673cae 1103 }
7c673cae
FG
1104 }
1105
7c673cae 1106 // data_mtx should be held when called
11fdf7f2 1107 void reduce_reservation_tags(const C& client_id, const RequestTag& tag) {
7c673cae
FG
1108 auto client_it = client_map.find(client_id);
1109
1110 // means the client was cleaned from map; should never happen
1111 // as long as cleaning times are long enough
1112 assert(client_map.end() != client_it);
11fdf7f2
TL
1113 ClientRec& client = *client_it->second;
1114 reduce_reservation_tags(TagCalc{}, client, tag);
1115
1116 // don't forget to update previous tag
1117 client.prev_tag.reservation -=
20effc67 1118 client.info->reservation_inv * (tag.cost + tag.rho);
11fdf7f2 1119 resv_heap.promote(client);
7c673cae
FG
1120 }
1121
1122
1123 // data_mtx should be held when called
1124 NextReq do_next_request(Time now) {
11fdf7f2
TL
1125 // if reservation queue is empty, all are empty (i.e., no
1126 // active clients)
7c673cae 1127 if(resv_heap.empty()) {
11fdf7f2 1128 return NextReq::none();
7c673cae
FG
1129 }
1130
1131 // try constraint (reservation) based scheduling
1132
1133 auto& reserv = resv_heap.top();
1134 if (reserv.has_request() &&
1135 reserv.next_request().tag.reservation <= now) {
11fdf7f2 1136 return NextReq(HeapId::reservation);
7c673cae
FG
1137 }
1138
1139 // no existing reservations before now, so try weight-based
1140 // scheduling
1141
1142 // all items that are within limit are eligible based on
1143 // priority
1144 auto limits = &limit_heap.top();
1145 while (limits->has_request() &&
1146 !limits->next_request().tag.ready &&
1147 limits->next_request().tag.limit <= now) {
1148 limits->next_request().tag.ready = true;
1149 ready_heap.promote(*limits);
1150 limit_heap.demote(*limits);
1151
1152 limits = &limit_heap.top();
1153 }
1154
1155 auto& readys = ready_heap.top();
1156 if (readys.has_request() &&
1157 readys.next_request().tag.ready &&
1158 readys.next_request().tag.proportion < max_tag) {
11fdf7f2 1159 return NextReq(HeapId::ready);
7c673cae
FG
1160 }
1161
1162 // if nothing is schedulable by reservation or
1163 // proportion/weight, and if we allow limit break, try to
1164 // schedule something with the lowest proportion tag or
1165 // alternatively lowest reservation tag.
11fdf7f2 1166 if (at_limit == AtLimit::Allow) {
7c673cae
FG
1167 if (readys.has_request() &&
1168 readys.next_request().tag.proportion < max_tag) {
11fdf7f2 1169 return NextReq(HeapId::ready);
7c673cae
FG
1170 } else if (reserv.has_request() &&
1171 reserv.next_request().tag.reservation < max_tag) {
11fdf7f2 1172 return NextReq(HeapId::reservation);
7c673cae
FG
1173 }
1174 }
1175
1176 // nothing scheduled; make sure we re-run when next
1177 // reservation item or next limited item comes up
1178
1179 Time next_call = TimeMax;
1180 if (resv_heap.top().has_request()) {
1181 next_call =
1182 min_not_0_time(next_call,
1183 resv_heap.top().next_request().tag.reservation);
1184 }
1185 if (limit_heap.top().has_request()) {
1186 const auto& next = limit_heap.top().next_request();
1187 assert(!next.tag.ready || max_tag == next.tag.proportion);
1188 next_call = min_not_0_time(next_call, next.tag.limit);
1189 }
1190 if (next_call < TimeMax) {
11fdf7f2 1191 return NextReq(next_call);
7c673cae 1192 } else {
11fdf7f2 1193 return NextReq::none();
7c673cae
FG
1194 }
1195 } // do_next_request
1196
1197
1198 // if possible is not zero and less than current then return it;
1199 // otherwise return current; the idea is we're trying to find
1200 // the minimal time but ignoring zero
1201 static inline const Time& min_not_0_time(const Time& current,
1202 const Time& possible) {
1203 return TimeZero == possible ? current : std::min(current, possible);
1204 }
1205
1206
1207 /*
1208 * This is being called regularly by RunEvery. Every time it's
1209 * called it notes the time and delta counter (mark point) in a
1210 * deque. It also looks at the deque to find the most recent
1211 * mark point that is older than clean_age. It then walks the
1212 * map and delete all server entries that were last used before
1213 * that mark point.
1214 */
1215 void do_clean() {
1216 TimePoint now = std::chrono::steady_clock::now();
1217 DataGuard g(data_mtx);
1218 clean_mark_points.emplace_back(MarkPoint(now, tick));
1219
1220 // first erase the super-old client records
1221
11fdf7f2 1222 Counter erase_point = last_erase_point;
7c673cae
FG
1223 auto point = clean_mark_points.front();
1224 while (point.first <= now - erase_age) {
11fdf7f2
TL
1225 last_erase_point = point.second;
1226 erase_point = last_erase_point;
7c673cae
FG
1227 clean_mark_points.pop_front();
1228 point = clean_mark_points.front();
1229 }
1230
1231 Counter idle_point = 0;
1232 for (auto i : clean_mark_points) {
1233 if (i.first <= now - idle_age) {
1234 idle_point = i.second;
1235 } else {
1236 break;
1237 }
1238 }
1239
11fdf7f2 1240 Counter erased_num = 0;
7c673cae
FG
1241 if (erase_point > 0 || idle_point > 0) {
1242 for (auto i = client_map.begin(); i != client_map.end(); /* empty */) {
1243 auto i2 = i++;
11fdf7f2
TL
1244 if (erase_point &&
1245 erased_num < erase_max &&
1246 i2->second->last_tick <= erase_point) {
7c673cae
FG
1247 delete_from_heaps(i2->second);
1248 client_map.erase(i2);
11fdf7f2 1249 erased_num++;
7c673cae
FG
1250 } else if (idle_point && i2->second->last_tick <= idle_point) {
1251 i2->second->idle = true;
1252 }
1253 } // for
11fdf7f2
TL
1254
1255 auto wperiod = check_time;
1256 if (erased_num >= erase_max) {
1257 wperiod = duration_cast<milliseconds>(aggressive_check_time);
1258 } else {
1259 // clean finished, refresh
1260 last_erase_point = 0;
1261 }
1262 cleaning_job->try_update(wperiod);
7c673cae
FG
1263 } // if
1264 } // do_clean
1265
1266
1267 // data_mtx must be held by caller
1268 template<IndIntruHeapData ClientRec::*C1,typename C2>
1269 void delete_from_heap(ClientRecRef& client,
1270 c::IndIntruHeap<ClientRecRef,ClientRec,C1,C2,B>& heap) {
11fdf7f2 1271 auto i = heap.at(client);
7c673cae
FG
1272 heap.remove(i);
1273 }
1274
1275
1276 // data_mtx must be held by caller
1277 void delete_from_heaps(ClientRecRef& client) {
1278 delete_from_heap(client, resv_heap);
1279#if USE_PROP_HEAP
1280 delete_from_heap(client, prop_heap);
1281#endif
1282 delete_from_heap(client, limit_heap);
1283 delete_from_heap(client, ready_heap);
1284 }
1285 }; // class PriorityQueueBase
1286
1287
9f95a23c 1288 template<typename C, typename R, bool IsDelayed=false, bool U1=false, unsigned B=2>
11fdf7f2
TL
1289 class PullPriorityQueue : public PriorityQueueBase<C,R,IsDelayed,U1,B> {
1290 using super = PriorityQueueBase<C,R,IsDelayed,U1,B>;
7c673cae
FG
1291
1292 public:
1293
1294 // When a request is pulled, this is the return type.
1295 struct PullReq {
1296 struct Retn {
11fdf7f2
TL
1297 C client;
1298 typename super::RequestRef request;
1299 PhaseType phase;
1300 Cost cost;
7c673cae
FG
1301 };
1302
1303 typename super::NextReqType type;
1304 boost::variant<Retn,Time> data;
1305
1306 bool is_none() const { return type == super::NextReqType::none; }
1307
1308 bool is_retn() const { return type == super::NextReqType::returning; }
1309 Retn& get_retn() {
1310 return boost::get<Retn>(data);
1311 }
1312
1313 bool is_future() const { return type == super::NextReqType::future; }
1314 Time getTime() const { return boost::get<Time>(data); }
1315 };
1316
1317
1318#ifdef PROFILE
1319 ProfileTimer<std::chrono::nanoseconds> pull_request_timer;
1320 ProfileTimer<std::chrono::nanoseconds> add_request_timer;
1321#endif
1322
1323 template<typename Rep, typename Per>
1324 PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1325 std::chrono::duration<Rep,Per> _idle_age,
1326 std::chrono::duration<Rep,Per> _erase_age,
1327 std::chrono::duration<Rep,Per> _check_time,
11fdf7f2
TL
1328 AtLimitParam at_limit_param = AtLimit::Wait,
1329 double _anticipation_timeout = 0.0) :
7c673cae
FG
1330 super(_client_info_f,
1331 _idle_age, _erase_age, _check_time,
11fdf7f2 1332 at_limit_param, _anticipation_timeout)
7c673cae
FG
1333 {
1334 // empty
1335 }
1336
1337
1338 // pull convenience constructor
1339 PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
11fdf7f2
TL
1340 AtLimitParam at_limit_param = AtLimit::Wait,
1341 double _anticipation_timeout = 0.0) :
7c673cae 1342 PullPriorityQueue(_client_info_f,
11fdf7f2
TL
1343 standard_idle_age,
1344 standard_erase_age,
1345 standard_check_time,
1346 at_limit_param,
1347 _anticipation_timeout)
7c673cae
FG
1348 {
1349 // empty
1350 }
1351
1352
11fdf7f2
TL
1353 int add_request(R&& request,
1354 const C& client_id,
1355 const ReqParams& req_params,
1356 const Cost cost = 1u) {
1357 return add_request(typename super::RequestRef(new R(std::move(request))),
1358 client_id,
1359 req_params,
1360 get_time(),
1361 cost);
7c673cae
FG
1362 }
1363
1364
11fdf7f2
TL
1365 int add_request(R&& request,
1366 const C& client_id,
1367 const Cost cost = 1u) {
7c673cae 1368 static const ReqParams null_req_params;
11fdf7f2
TL
1369 return add_request(typename super::RequestRef(new R(std::move(request))),
1370 client_id,
1371 null_req_params,
1372 get_time(),
1373 cost);
7c673cae
FG
1374 }
1375
1376
11fdf7f2
TL
1377 int add_request_time(R&& request,
1378 const C& client_id,
1379 const ReqParams& req_params,
1380 const Time time,
1381 const Cost cost = 1u) {
1382 return add_request(typename super::RequestRef(new R(std::move(request))),
1383 client_id,
1384 req_params,
1385 time,
1386 cost);
7c673cae
FG
1387 }
1388
1389
11fdf7f2
TL
1390 int add_request(typename super::RequestRef&& request,
1391 const C& client_id,
1392 const ReqParams& req_params,
1393 const Cost cost = 1u) {
20effc67 1394 return add_request(std::move(request), client_id, req_params, get_time(), cost);
7c673cae
FG
1395 }
1396
1397
11fdf7f2
TL
1398 int add_request(typename super::RequestRef&& request,
1399 const C& client_id,
1400 const Cost cost = 1u) {
7c673cae 1401 static const ReqParams null_req_params;
20effc67 1402 return add_request(std::move(request), client_id, null_req_params, get_time(), cost);
7c673cae
FG
1403 }
1404
1405
1406 // this does the work; the versions above provide alternate interfaces
11fdf7f2
TL
1407 int add_request(typename super::RequestRef&& request,
1408 const C& client_id,
1409 const ReqParams& req_params,
1410 const Time time,
1411 const Cost cost = 1u) {
7c673cae
FG
1412 typename super::DataGuard g(this->data_mtx);
1413#ifdef PROFILE
1414 add_request_timer.start();
1415#endif
11fdf7f2
TL
1416 int r = super::do_add_request(std::move(request),
1417 client_id,
1418 req_params,
1419 time,
1420 cost);
7c673cae
FG
1421 // no call to schedule_request for pull version
1422#ifdef PROFILE
1423 add_request_timer.stop();
1424#endif
11fdf7f2 1425 return r;
7c673cae
FG
1426 }
1427
1428
1429 inline PullReq pull_request() {
1430 return pull_request(get_time());
1431 }
1432
1433
11fdf7f2 1434 PullReq pull_request(const Time now) {
7c673cae
FG
1435 PullReq result;
1436 typename super::DataGuard g(this->data_mtx);
1437#ifdef PROFILE
1438 pull_request_timer.start();
1439#endif
1440
1441 typename super::NextReq next = super::do_next_request(now);
1442 result.type = next.type;
1443 switch(next.type) {
1444 case super::NextReqType::none:
1445 return result;
7c673cae
FG
1446 case super::NextReqType::future:
1447 result.data = next.when_ready;
1448 return result;
7c673cae
FG
1449 case super::NextReqType::returning:
1450 // to avoid nesting, break out and let code below handle this case
1451 break;
1452 default:
1453 assert(false);
1454 }
1455
1456 // we'll only get here if we're returning an entry
1457
1458 auto process_f =
1459 [&] (PullReq& pull_result, PhaseType phase) ->
1460 std::function<void(const C&,
11fdf7f2 1461 uint64_t,
7c673cae
FG
1462 typename super::RequestRef&)> {
1463 return [&pull_result, phase](const C& client,
11fdf7f2 1464 const Cost request_cost,
7c673cae 1465 typename super::RequestRef& request) {
11fdf7f2
TL
1466 pull_result.data = typename PullReq::Retn{ client,
1467 std::move(request),
1468 phase,
1469 request_cost };
7c673cae
FG
1470 };
1471 };
1472
1473 switch(next.heap_id) {
1474 case super::HeapId::reservation:
11fdf7f2
TL
1475 (void) super::pop_process_request(this->resv_heap,
1476 process_f(result,
1477 PhaseType::reservation));
7c673cae
FG
1478 ++this->reserv_sched_count;
1479 break;
1480 case super::HeapId::ready:
11fdf7f2
TL
1481 {
1482 auto tag = super::pop_process_request(this->ready_heap,
7c673cae 1483 process_f(result, PhaseType::priority));
11fdf7f2 1484 // need to use retn temporarily
7c673cae 1485 auto& retn = boost::get<typename PullReq::Retn>(result.data);
11fdf7f2 1486 super::reduce_reservation_tags(retn.client, tag);
7c673cae
FG
1487 }
1488 ++this->prop_sched_count;
1489 break;
1490 default:
1491 assert(false);
1492 }
1493
1494#ifdef PROFILE
1495 pull_request_timer.stop();
1496#endif
1497 return result;
1498 } // pull_request
1499
1500
1501 protected:
1502
1503
1504 // data_mtx should be held when called; unfortunately this
1505 // function has to be repeated in both push & pull
1506 // specializations
1507 typename super::NextReq next_request() {
1508 return next_request(get_time());
1509 }
1510 }; // class PullPriorityQueue
1511
20effc67
TL
1512#ifndef WITH_SEASTAR
1513 // TODO: PushPriorityQueue is not ported to seastar yet
7c673cae 1514 // PUSH version
9f95a23c 1515 template<typename C, typename R, bool IsDelayed=false, bool U1=false, unsigned B=2>
11fdf7f2 1516 class PushPriorityQueue : public PriorityQueueBase<C,R,IsDelayed,U1,B> {
7c673cae
FG
1517
1518 protected:
1519
11fdf7f2 1520 using super = PriorityQueueBase<C,R,IsDelayed,U1,B>;
7c673cae
FG
1521
1522 public:
1523
1524 // a function to see whether the server can handle another request
1525 using CanHandleRequestFunc = std::function<bool(void)>;
1526
1527 // a function to submit a request to the server; the second
1528 // parameter is a callback when it's completed
1529 using HandleRequestFunc =
11fdf7f2 1530 std::function<void(const C&,typename super::RequestRef,PhaseType,uint64_t)>;
7c673cae
FG
1531
1532 protected:
1533
1534 CanHandleRequestFunc can_handle_f;
1535 HandleRequestFunc handle_f;
1536 // for handling timed scheduling
1537 std::mutex sched_ahead_mtx;
1538 std::condition_variable sched_ahead_cv;
1539 Time sched_ahead_when = TimeZero;
1540
1541#ifdef PROFILE
1542 public:
1543 ProfileTimer<std::chrono::nanoseconds> add_request_timer;
1544 ProfileTimer<std::chrono::nanoseconds> request_complete_timer;
1545 protected:
1546#endif
1547
1548 // NB: threads declared last, so constructed last and destructed first
1549
1550 std::thread sched_ahead_thd;
1551
1552 public:
1553
1554 // push full constructor
1555 template<typename Rep, typename Per>
1556 PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1557 CanHandleRequestFunc _can_handle_f,
1558 HandleRequestFunc _handle_f,
1559 std::chrono::duration<Rep,Per> _idle_age,
1560 std::chrono::duration<Rep,Per> _erase_age,
1561 std::chrono::duration<Rep,Per> _check_time,
11fdf7f2
TL
1562 AtLimitParam at_limit_param = AtLimit::Wait,
1563 double anticipation_timeout = 0.0) :
7c673cae
FG
1564 super(_client_info_f,
1565 _idle_age, _erase_age, _check_time,
11fdf7f2 1566 at_limit_param, anticipation_timeout)
7c673cae
FG
1567 {
1568 can_handle_f = _can_handle_f;
1569 handle_f = _handle_f;
1570 sched_ahead_thd = std::thread(&PushPriorityQueue::run_sched_ahead, this);
1571 }
1572
1573
1574 // push convenience constructor
1575 PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1576 CanHandleRequestFunc _can_handle_f,
1577 HandleRequestFunc _handle_f,
11fdf7f2
TL
1578 AtLimitParam at_limit_param = AtLimit::Wait,
1579 double _anticipation_timeout = 0.0) :
7c673cae
FG
1580 PushPriorityQueue(_client_info_f,
1581 _can_handle_f,
1582 _handle_f,
11fdf7f2
TL
1583 standard_idle_age,
1584 standard_erase_age,
1585 standard_check_time,
1586 at_limit_param,
1587 _anticipation_timeout)
7c673cae
FG
1588 {
1589 // empty
1590 }
1591
1592
1593 ~PushPriorityQueue() {
1594 this->finishing = true;
f67539c2
TL
1595 {
1596 std::lock_guard<std::mutex> l(sched_ahead_mtx);
1597 sched_ahead_cv.notify_one();
1598 }
7c673cae
FG
1599 sched_ahead_thd.join();
1600 }
1601
1602 public:
1603
11fdf7f2
TL
1604 int add_request(R&& request,
1605 const C& client_id,
1606 const ReqParams& req_params,
1607 const Cost cost = 1u) {
1608 return add_request(typename super::RequestRef(new R(std::move(request))),
1609 client_id,
1610 req_params,
1611 get_time(),
1612 cost);
7c673cae
FG
1613 }
1614
1615
11fdf7f2
TL
1616 int add_request(typename super::RequestRef&& request,
1617 const C& client_id,
1618 const ReqParams& req_params,
1619 const Cost cost = 1u) {
20effc67 1620 return add_request(std::move(request), client_id, req_params, get_time(), cost);
7c673cae
FG
1621 }
1622
1623
11fdf7f2
TL
1624 int add_request_time(const R& request,
1625 const C& client_id,
1626 const ReqParams& req_params,
1627 const Time time,
1628 const Cost cost = 1u) {
1629 return add_request(typename super::RequestRef(new R(request)),
1630 client_id,
1631 req_params,
1632 time,
1633 cost);
7c673cae
FG
1634 }
1635
1636
11fdf7f2
TL
1637 int add_request(typename super::RequestRef&& request,
1638 const C& client_id,
1639 const ReqParams& req_params,
1640 const Time time,
1641 const Cost cost = 1u) {
7c673cae
FG
1642 typename super::DataGuard g(this->data_mtx);
1643#ifdef PROFILE
1644 add_request_timer.start();
1645#endif
11fdf7f2
TL
1646 int r = super::do_add_request(std::move(request),
1647 client_id,
1648 req_params,
1649 time,
1650 cost);
1651 if (r == 0) {
20effc67 1652 (void) schedule_request();
11fdf7f2 1653 }
7c673cae
FG
1654#ifdef PROFILE
1655 add_request_timer.stop();
1656#endif
11fdf7f2 1657 return r;
7c673cae
FG
1658 }
1659
1660
1661 void request_completed() {
1662 typename super::DataGuard g(this->data_mtx);
1663#ifdef PROFILE
1664 request_complete_timer.start();
1665#endif
20effc67 1666 (void) schedule_request();
7c673cae
FG
1667#ifdef PROFILE
1668 request_complete_timer.stop();
1669#endif
1670 }
1671
1672 protected:
1673
1674 // data_mtx should be held when called; furthermore, the heap
1675 // should not be empty and the top element of the heap should
1676 // not be already handled
1677 //
1678 // NOTE: the use of "super::ClientRec" in either the template
1679 // construct or as a parameter to submit_top_request generated
1680 // a compiler error in g++ 4.8.4, when ClientRec was
1681 // "protected" rather than "public". By g++ 6.3.1 this was not
1682 // an issue. But for backwards compatibility
1683 // PriorityQueueBase::ClientRec is public.
1684 template<typename C1,
1685 IndIntruHeapData super::ClientRec::*C2,
1686 typename C3,
9f95a23c 1687 unsigned B4>
11fdf7f2
TL
1688 typename super::RequestMeta
1689 submit_top_request(IndIntruHeap<C1,typename super::ClientRec,C2,C3,B4>& heap,
1690 PhaseType phase) {
7c673cae 1691 C client_result;
11fdf7f2 1692 RequestTag tag = super::pop_process_request(heap,
7c673cae
FG
1693 [this, phase, &client_result]
1694 (const C& client,
11fdf7f2 1695 const Cost request_cost,
7c673cae
FG
1696 typename super::RequestRef& request) {
1697 client_result = client;
11fdf7f2 1698 handle_f(client, std::move(request), phase, request_cost);
7c673cae 1699 });
11fdf7f2
TL
1700 typename super::RequestMeta req(client_result, tag);
1701 return req;
7c673cae
FG
1702 }
1703
1704
1705 // data_mtx should be held when called
1706 void submit_request(typename super::HeapId heap_id) {
7c673cae
FG
1707 switch(heap_id) {
1708 case super::HeapId::reservation:
1709 // don't need to note client
1710 (void) submit_top_request(this->resv_heap, PhaseType::reservation);
1711 // unlike the other two cases, we do not reduce reservation
1712 // tags here
1713 ++this->reserv_sched_count;
1714 break;
1715 case super::HeapId::ready:
11fdf7f2
TL
1716 {
1717 auto req = submit_top_request(this->ready_heap, PhaseType::priority);
1718 super::reduce_reservation_tags(req.client_id, req.tag);
1719 }
7c673cae
FG
1720 ++this->prop_sched_count;
1721 break;
1722 default:
1723 assert(false);
1724 }
1725 } // submit_request
1726
1727
1728 // data_mtx should be held when called; unfortunately this
1729 // function has to be repeated in both push & pull
1730 // specializations
1731 typename super::NextReq next_request() {
1732 return next_request(get_time());
1733 }
1734
1735
1736 // data_mtx should be held when called; overrides member
1737 // function in base class to add check for whether a request can
1738 // be pushed to the server
1739 typename super::NextReq next_request(Time now) {
1740 if (!can_handle_f()) {
1741 typename super::NextReq result;
1742 result.type = super::NextReqType::none;
1743 return result;
1744 } else {
1745 return super::do_next_request(now);
1746 }
1747 } // next_request
1748
1749
1750 // data_mtx should be held when called
20effc67 1751 typename super::NextReqType schedule_request() {
7c673cae
FG
1752 typename super::NextReq next_req = next_request();
1753 switch (next_req.type) {
1754 case super::NextReqType::none:
20effc67 1755 break;
7c673cae
FG
1756 case super::NextReqType::future:
1757 sched_at(next_req.when_ready);
1758 break;
1759 case super::NextReqType::returning:
1760 submit_request(next_req.heap_id);
1761 break;
1762 default:
1763 assert(false);
1764 }
20effc67 1765 return next_req.type;
7c673cae
FG
1766 }
1767
1768
1769 // this is the thread that handles running schedule_request at
1770 // future times when nothing can be scheduled immediately
1771 void run_sched_ahead() {
1772 std::unique_lock<std::mutex> l(sched_ahead_mtx);
1773
1774 while (!this->finishing) {
f67539c2 1775 // predicate for cond.wait()
20effc67
TL
1776 const auto pred = [this] () -> bool {
1777 return this->finishing || sched_ahead_when > TimeZero;
1778 };
f67539c2 1779
7c673cae 1780 if (TimeZero == sched_ahead_when) {
f67539c2 1781 sched_ahead_cv.wait(l, pred);
7c673cae 1782 } else {
f67539c2
TL
1783 // cast from Time -> duration<Time> -> Duration -> TimePoint
1784 const auto until = typename super::TimePoint{
1785 duration_cast<typename super::Duration>(
1786 std::chrono::duration<Time>{sched_ahead_when})};
1787 sched_ahead_cv.wait_until(l, until, pred);
7c673cae
FG
1788 sched_ahead_when = TimeZero;
1789 if (this->finishing) return;
1790
1791 l.unlock();
1792 if (!this->finishing) {
20effc67
TL
1793 do {
1794 typename super::DataGuard g(this->data_mtx);
1795 if (schedule_request() == super::NextReqType::future)
1796 break;
1797 } while (!this->empty());
1798 }
7c673cae
FG
1799 l.lock();
1800 }
1801 }
1802 }
1803
1804
1805 void sched_at(Time when) {
1806 std::lock_guard<std::mutex> l(sched_ahead_mtx);
31f18b77 1807 if (this->finishing) return;
7c673cae
FG
1808 if (TimeZero == sched_ahead_when || when < sched_ahead_when) {
1809 sched_ahead_when = when;
1810 sched_ahead_cv.notify_one();
1811 }
1812 }
1813 }; // class PushPriorityQueue
20effc67 1814#endif // !WITH_SEASTAR
7c673cae
FG
1815 } // namespace dmclock
1816} // namespace crimson