]> git.proxmox.com Git - ceph.git/blame - ceph/src/dmclock/src/dmclock_client.h
update sources to v12.1.3
[ceph.git] / ceph / src / dmclock / src / dmclock_client.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4/*
5 * Copyright (C) 2017 Red Hat Inc.
6 */
7
8
9#pragma once
10
11#include <map>
12#include <deque>
13#include <chrono>
14#include <thread>
15#include <mutex>
16#include <condition_variable>
17
18#include "run_every.h"
19#include "dmclock_util.h"
20#include "dmclock_recs.h"
21
7c673cae
FG
22
23namespace crimson {
24 namespace dmclock {
25 struct ServerInfo {
26 Counter delta_prev_req;
27 Counter rho_prev_req;
28 uint32_t my_delta;
29 uint32_t my_rho;
30
31 ServerInfo(Counter _delta_prev_req,
32 Counter _rho_prev_req) :
33 delta_prev_req(_delta_prev_req),
34 rho_prev_req(_rho_prev_req),
35 my_delta(0),
36 my_rho(0)
37 {
38 // empty
39 }
40
41 inline void req_update(Counter delta, Counter rho) {
42 delta_prev_req = delta;
43 rho_prev_req = rho;
44 my_delta = 0;
45 my_rho = 0;
46 }
47
48 inline void resp_update(PhaseType phase) {
49 ++my_delta;
50 if (phase == PhaseType::reservation) ++my_rho;
51 }
52 };
53
54
55 // S is server identifier type
56 template<typename S>
57 class ServiceTracker {
d2e6a577
FG
58 // we don't want to include gtest.h just for FRIEND_TEST
59 friend class dmclock_client_server_erase_Test;
7c673cae
FG
60
61 using TimePoint = decltype(std::chrono::steady_clock::now());
62 using Duration = std::chrono::milliseconds;
63 using MarkPoint = std::pair<TimePoint,Counter>;
64
65 Counter delta_counter; // # reqs completed
66 Counter rho_counter; // # reqs completed via reservation
67 std::map<S,ServerInfo> server_map;
68 mutable std::mutex data_mtx; // protects Counters and map
69
70 using DataGuard = std::lock_guard<decltype(data_mtx)>;
71
72 // clean config
73
74 std::deque<MarkPoint> clean_mark_points;
75 Duration clean_age; // age at which ServerInfo cleaned
76
77 // NB: All threads declared at end, so they're destructed firs!
78
79 std::unique_ptr<RunEvery> cleaning_job;
80
81
82 public:
83
84 // we have to start the counters at 1, as 0 is used in the
85 // cleaning process
86 template<typename Rep, typename Per>
87 ServiceTracker(std::chrono::duration<Rep,Per> _clean_every,
88 std::chrono::duration<Rep,Per> _clean_age) :
89 delta_counter(1),
90 rho_counter(1),
91 clean_age(std::chrono::duration_cast<Duration>(_clean_age))
92 {
93 cleaning_job =
94 std::unique_ptr<RunEvery>(
95 new RunEvery(_clean_every,
96 std::bind(&ServiceTracker::do_clean, this)));
97 }
98
99
100 // the reason we're overloading the constructor rather than
101 // using default values for the arguments is so that callers
102 // have to either use all defaults or specify all timings; with
103 // default arguments they could specify some without others
104 ServiceTracker() :
105 ServiceTracker(std::chrono::minutes(5), std::chrono::minutes(10))
106 {
107 // empty
108 }
109
110
111 /*
112 * Incorporates the RespParams received into the various counter.
113 */
114 void track_resp(const S& server_id, const PhaseType& phase) {
115 DataGuard g(data_mtx);
116
117 auto it = server_map.find(server_id);
118 if (server_map.end() == it) {
119 // this code can only run if a request did not precede the
120 // response or if the record was cleaned up b/w when
121 // the request was made and now
122 ServerInfo si(delta_counter, rho_counter);
123 si.resp_update(phase);
124 server_map.emplace(server_id, si);
125 } else {
126 it->second.resp_update(phase);
127 }
128
129 ++delta_counter;
130 if (PhaseType::reservation == phase) {
131 ++rho_counter;
132 }
133 }
134
135
136 /*
137 * Returns the ReqParams for the given server.
138 */
139 ReqParams get_req_params(const S& server) {
140 DataGuard g(data_mtx);
141 auto it = server_map.find(server);
142 if (server_map.end() == it) {
143 server_map.emplace(server, ServerInfo(delta_counter, rho_counter));
144 return ReqParams(1, 1);
145 } else {
146 Counter delta =
147 1 + delta_counter - it->second.delta_prev_req - it->second.my_delta;
148 Counter rho =
149 1 + rho_counter - it->second.rho_prev_req - it->second.my_rho;
150
151 it->second.req_update(delta_counter, rho_counter);
152
153 return ReqParams(uint32_t(delta), uint32_t(rho));
154 }
155 }
156
157 private:
158
159 /*
160 * This is being called regularly by RunEvery. Every time it's
161 * called it notes the time and delta counter (mark point) in a
162 * deque. It also looks at the deque to find the most recent
163 * mark point that is older than clean_age. It then walks the
164 * map and delete all server entries that were last used before
165 * that mark point.
166 */
167 void do_clean() {
168 TimePoint now = std::chrono::steady_clock::now();
169 DataGuard g(data_mtx);
170 clean_mark_points.emplace_back(MarkPoint(now, delta_counter));
171
172 Counter earliest = 0;
173 auto point = clean_mark_points.front();
174 while (point.first <= now - clean_age) {
175 earliest = point.second;
176 clean_mark_points.pop_front();
177 point = clean_mark_points.front();
178 }
179
180 if (earliest > 0) {
181 for (auto i = server_map.begin();
182 i != server_map.end();
183 /* empty */) {
184 auto i2 = i++;
185 if (i2->second.delta_prev_req <= earliest) {
186 server_map.erase(i2);
187 }
188 }
189 }
190 } // do_clean
191 }; // class ServiceTracker
192 }
193}