]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | /* | |
5 | * Copyright (C) 2017 Red Hat Inc. | |
6 | */ | |
7 | ||
8 | ||
9 | #pragma once | |
10 | ||
11 | #include <map> | |
12 | #include <deque> | |
13 | #include <chrono> | |
14 | #include <thread> | |
15 | #include <mutex> | |
16 | #include <condition_variable> | |
17 | ||
18 | #include "run_every.h" | |
19 | #include "dmclock_util.h" | |
20 | #include "dmclock_recs.h" | |
21 | ||
22 | #include "gtest/gtest_prod.h" | |
23 | ||
24 | ||
25 | namespace crimson { | |
26 | namespace dmclock { | |
27 | struct ServerInfo { | |
28 | Counter delta_prev_req; | |
29 | Counter rho_prev_req; | |
30 | uint32_t my_delta; | |
31 | uint32_t my_rho; | |
32 | ||
33 | ServerInfo(Counter _delta_prev_req, | |
34 | Counter _rho_prev_req) : | |
35 | delta_prev_req(_delta_prev_req), | |
36 | rho_prev_req(_rho_prev_req), | |
37 | my_delta(0), | |
38 | my_rho(0) | |
39 | { | |
40 | // empty | |
41 | } | |
42 | ||
43 | inline void req_update(Counter delta, Counter rho) { | |
44 | delta_prev_req = delta; | |
45 | rho_prev_req = rho; | |
46 | my_delta = 0; | |
47 | my_rho = 0; | |
48 | } | |
49 | ||
50 | inline void resp_update(PhaseType phase) { | |
51 | ++my_delta; | |
52 | if (phase == PhaseType::reservation) ++my_rho; | |
53 | } | |
54 | }; | |
55 | ||
56 | ||
57 | // S is server identifier type | |
58 | template<typename S> | |
59 | class ServiceTracker { | |
60 | FRIEND_TEST(dmclock_client, server_erase); | |
61 | ||
62 | using TimePoint = decltype(std::chrono::steady_clock::now()); | |
63 | using Duration = std::chrono::milliseconds; | |
64 | using MarkPoint = std::pair<TimePoint,Counter>; | |
65 | ||
66 | Counter delta_counter; // # reqs completed | |
67 | Counter rho_counter; // # reqs completed via reservation | |
68 | std::map<S,ServerInfo> server_map; | |
69 | mutable std::mutex data_mtx; // protects Counters and map | |
70 | ||
71 | using DataGuard = std::lock_guard<decltype(data_mtx)>; | |
72 | ||
73 | // clean config | |
74 | ||
75 | std::deque<MarkPoint> clean_mark_points; | |
76 | Duration clean_age; // age at which ServerInfo cleaned | |
77 | ||
78 | // NB: All threads declared at end, so they're destructed firs! | |
79 | ||
80 | std::unique_ptr<RunEvery> cleaning_job; | |
81 | ||
82 | ||
83 | public: | |
84 | ||
85 | // we have to start the counters at 1, as 0 is used in the | |
86 | // cleaning process | |
87 | template<typename Rep, typename Per> | |
88 | ServiceTracker(std::chrono::duration<Rep,Per> _clean_every, | |
89 | std::chrono::duration<Rep,Per> _clean_age) : | |
90 | delta_counter(1), | |
91 | rho_counter(1), | |
92 | clean_age(std::chrono::duration_cast<Duration>(_clean_age)) | |
93 | { | |
94 | cleaning_job = | |
95 | std::unique_ptr<RunEvery>( | |
96 | new RunEvery(_clean_every, | |
97 | std::bind(&ServiceTracker::do_clean, this))); | |
98 | } | |
99 | ||
100 | ||
101 | // the reason we're overloading the constructor rather than | |
102 | // using default values for the arguments is so that callers | |
103 | // have to either use all defaults or specify all timings; with | |
104 | // default arguments they could specify some without others | |
105 | ServiceTracker() : | |
106 | ServiceTracker(std::chrono::minutes(5), std::chrono::minutes(10)) | |
107 | { | |
108 | // empty | |
109 | } | |
110 | ||
111 | ||
112 | /* | |
113 | * Incorporates the RespParams received into the various counter. | |
114 | */ | |
115 | void track_resp(const S& server_id, const PhaseType& phase) { | |
116 | DataGuard g(data_mtx); | |
117 | ||
118 | auto it = server_map.find(server_id); | |
119 | if (server_map.end() == it) { | |
120 | // this code can only run if a request did not precede the | |
121 | // response or if the record was cleaned up b/w when | |
122 | // the request was made and now | |
123 | ServerInfo si(delta_counter, rho_counter); | |
124 | si.resp_update(phase); | |
125 | server_map.emplace(server_id, si); | |
126 | } else { | |
127 | it->second.resp_update(phase); | |
128 | } | |
129 | ||
130 | ++delta_counter; | |
131 | if (PhaseType::reservation == phase) { | |
132 | ++rho_counter; | |
133 | } | |
134 | } | |
135 | ||
136 | ||
137 | /* | |
138 | * Returns the ReqParams for the given server. | |
139 | */ | |
140 | ReqParams get_req_params(const S& server) { | |
141 | DataGuard g(data_mtx); | |
142 | auto it = server_map.find(server); | |
143 | if (server_map.end() == it) { | |
144 | server_map.emplace(server, ServerInfo(delta_counter, rho_counter)); | |
145 | return ReqParams(1, 1); | |
146 | } else { | |
147 | Counter delta = | |
148 | 1 + delta_counter - it->second.delta_prev_req - it->second.my_delta; | |
149 | Counter rho = | |
150 | 1 + rho_counter - it->second.rho_prev_req - it->second.my_rho; | |
151 | ||
152 | it->second.req_update(delta_counter, rho_counter); | |
153 | ||
154 | return ReqParams(uint32_t(delta), uint32_t(rho)); | |
155 | } | |
156 | } | |
157 | ||
158 | private: | |
159 | ||
160 | /* | |
161 | * This is being called regularly by RunEvery. Every time it's | |
162 | * called it notes the time and delta counter (mark point) in a | |
163 | * deque. It also looks at the deque to find the most recent | |
164 | * mark point that is older than clean_age. It then walks the | |
165 | * map and delete all server entries that were last used before | |
166 | * that mark point. | |
167 | */ | |
168 | void do_clean() { | |
169 | TimePoint now = std::chrono::steady_clock::now(); | |
170 | DataGuard g(data_mtx); | |
171 | clean_mark_points.emplace_back(MarkPoint(now, delta_counter)); | |
172 | ||
173 | Counter earliest = 0; | |
174 | auto point = clean_mark_points.front(); | |
175 | while (point.first <= now - clean_age) { | |
176 | earliest = point.second; | |
177 | clean_mark_points.pop_front(); | |
178 | point = clean_mark_points.front(); | |
179 | } | |
180 | ||
181 | if (earliest > 0) { | |
182 | for (auto i = server_map.begin(); | |
183 | i != server_map.end(); | |
184 | /* empty */) { | |
185 | auto i2 = i++; | |
186 | if (i2->second.delta_prev_req <= earliest) { | |
187 | server_map.erase(i2); | |
188 | } | |
189 | } | |
190 | } | |
191 | } // do_clean | |
192 | }; // class ServiceTracker | |
193 | } | |
194 | } |