]> git.proxmox.com Git - ceph.git/blob - ceph/src/dmclock/src/dmclock_client.h
b44e1211b53f667daf23a8ed651480f4574a931d
[ceph.git] / ceph / src / dmclock / src / dmclock_client.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /*
5 * Copyright (C) 2017 Red Hat Inc.
6 */
7
8
9 #pragma once
10
11 #include <map>
12 #include <deque>
13 #include <chrono>
14 #include <thread>
15 #include <mutex>
16 #include <condition_variable>
17
18 #include "run_every.h"
19 #include "dmclock_util.h"
20 #include "dmclock_recs.h"
21
22 #include "gtest/gtest_prod.h"
23
24
25 namespace crimson {
26 namespace dmclock {
27 struct ServerInfo {
28 Counter delta_prev_req;
29 Counter rho_prev_req;
30 uint32_t my_delta;
31 uint32_t my_rho;
32
33 ServerInfo(Counter _delta_prev_req,
34 Counter _rho_prev_req) :
35 delta_prev_req(_delta_prev_req),
36 rho_prev_req(_rho_prev_req),
37 my_delta(0),
38 my_rho(0)
39 {
40 // empty
41 }
42
43 inline void req_update(Counter delta, Counter rho) {
44 delta_prev_req = delta;
45 rho_prev_req = rho;
46 my_delta = 0;
47 my_rho = 0;
48 }
49
50 inline void resp_update(PhaseType phase) {
51 ++my_delta;
52 if (phase == PhaseType::reservation) ++my_rho;
53 }
54 };
55
56
57 // S is server identifier type
58 template<typename S>
59 class ServiceTracker {
60 FRIEND_TEST(dmclock_client, server_erase);
61
62 using TimePoint = decltype(std::chrono::steady_clock::now());
63 using Duration = std::chrono::milliseconds;
64 using MarkPoint = std::pair<TimePoint,Counter>;
65
66 Counter delta_counter; // # reqs completed
67 Counter rho_counter; // # reqs completed via reservation
68 std::map<S,ServerInfo> server_map;
69 mutable std::mutex data_mtx; // protects Counters and map
70
71 using DataGuard = std::lock_guard<decltype(data_mtx)>;
72
73 // clean config
74
75 std::deque<MarkPoint> clean_mark_points;
76 Duration clean_age; // age at which ServerInfo cleaned
77
78 // NB: All threads declared at end, so they're destructed firs!
79
80 std::unique_ptr<RunEvery> cleaning_job;
81
82
83 public:
84
85 // we have to start the counters at 1, as 0 is used in the
86 // cleaning process
87 template<typename Rep, typename Per>
88 ServiceTracker(std::chrono::duration<Rep,Per> _clean_every,
89 std::chrono::duration<Rep,Per> _clean_age) :
90 delta_counter(1),
91 rho_counter(1),
92 clean_age(std::chrono::duration_cast<Duration>(_clean_age))
93 {
94 cleaning_job =
95 std::unique_ptr<RunEvery>(
96 new RunEvery(_clean_every,
97 std::bind(&ServiceTracker::do_clean, this)));
98 }
99
100
101 // the reason we're overloading the constructor rather than
102 // using default values for the arguments is so that callers
103 // have to either use all defaults or specify all timings; with
104 // default arguments they could specify some without others
105 ServiceTracker() :
106 ServiceTracker(std::chrono::minutes(5), std::chrono::minutes(10))
107 {
108 // empty
109 }
110
111
112 /*
113 * Incorporates the RespParams received into the various counter.
114 */
115 void track_resp(const S& server_id, const PhaseType& phase) {
116 DataGuard g(data_mtx);
117
118 auto it = server_map.find(server_id);
119 if (server_map.end() == it) {
120 // this code can only run if a request did not precede the
121 // response or if the record was cleaned up b/w when
122 // the request was made and now
123 ServerInfo si(delta_counter, rho_counter);
124 si.resp_update(phase);
125 server_map.emplace(server_id, si);
126 } else {
127 it->second.resp_update(phase);
128 }
129
130 ++delta_counter;
131 if (PhaseType::reservation == phase) {
132 ++rho_counter;
133 }
134 }
135
136
137 /*
138 * Returns the ReqParams for the given server.
139 */
140 ReqParams get_req_params(const S& server) {
141 DataGuard g(data_mtx);
142 auto it = server_map.find(server);
143 if (server_map.end() == it) {
144 server_map.emplace(server, ServerInfo(delta_counter, rho_counter));
145 return ReqParams(1, 1);
146 } else {
147 Counter delta =
148 1 + delta_counter - it->second.delta_prev_req - it->second.my_delta;
149 Counter rho =
150 1 + rho_counter - it->second.rho_prev_req - it->second.my_rho;
151
152 it->second.req_update(delta_counter, rho_counter);
153
154 return ReqParams(uint32_t(delta), uint32_t(rho));
155 }
156 }
157
158 private:
159
160 /*
161 * This is being called regularly by RunEvery. Every time it's
162 * called it notes the time and delta counter (mark point) in a
163 * deque. It also looks at the deque to find the most recent
164 * mark point that is older than clean_age. It then walks the
165 * map and delete all server entries that were last used before
166 * that mark point.
167 */
168 void do_clean() {
169 TimePoint now = std::chrono::steady_clock::now();
170 DataGuard g(data_mtx);
171 clean_mark_points.emplace_back(MarkPoint(now, delta_counter));
172
173 Counter earliest = 0;
174 auto point = clean_mark_points.front();
175 while (point.first <= now - clean_age) {
176 earliest = point.second;
177 clean_mark_points.pop_front();
178 point = clean_mark_points.front();
179 }
180
181 if (earliest > 0) {
182 for (auto i = server_map.begin();
183 i != server_map.end();
184 /* empty */) {
185 auto i2 = i++;
186 if (i2->second.delta_prev_req <= earliest) {
187 server_map.erase(i2);
188 }
189 }
190 }
191 } // do_clean
192 }; // class ServiceTracker
193 }
194 }