]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | /* | |
5 | * Copyright (C) 2017 Red Hat Inc. | |
6 | */ | |
7 | ||
8 | ||
9 | #pragma once | |
10 | ||
11 | #include <map> | |
12 | #include <deque> | |
13 | #include <chrono> | |
14 | #include <thread> | |
15 | #include <mutex> | |
16 | #include <condition_variable> | |
17 | ||
18 | #include "run_every.h" | |
19 | #include "dmclock_util.h" | |
20 | #include "dmclock_recs.h" | |
21 | ||
7c673cae FG |
22 | |
23 | namespace crimson { | |
24 | namespace dmclock { | |
25 | struct ServerInfo { | |
26 | Counter delta_prev_req; | |
27 | Counter rho_prev_req; | |
28 | uint32_t my_delta; | |
29 | uint32_t my_rho; | |
30 | ||
31 | ServerInfo(Counter _delta_prev_req, | |
32 | Counter _rho_prev_req) : | |
33 | delta_prev_req(_delta_prev_req), | |
34 | rho_prev_req(_rho_prev_req), | |
35 | my_delta(0), | |
36 | my_rho(0) | |
37 | { | |
38 | // empty | |
39 | } | |
40 | ||
41 | inline void req_update(Counter delta, Counter rho) { | |
42 | delta_prev_req = delta; | |
43 | rho_prev_req = rho; | |
44 | my_delta = 0; | |
45 | my_rho = 0; | |
46 | } | |
47 | ||
48 | inline void resp_update(PhaseType phase) { | |
49 | ++my_delta; | |
50 | if (phase == PhaseType::reservation) ++my_rho; | |
51 | } | |
52 | }; | |
53 | ||
54 | ||
55 | // S is server identifier type | |
56 | template<typename S> | |
57 | class ServiceTracker { | |
d2e6a577 FG |
58 | // we don't want to include gtest.h just for FRIEND_TEST |
59 | friend class dmclock_client_server_erase_Test; | |
7c673cae FG |
60 | |
61 | using TimePoint = decltype(std::chrono::steady_clock::now()); | |
62 | using Duration = std::chrono::milliseconds; | |
63 | using MarkPoint = std::pair<TimePoint,Counter>; | |
64 | ||
65 | Counter delta_counter; // # reqs completed | |
66 | Counter rho_counter; // # reqs completed via reservation | |
67 | std::map<S,ServerInfo> server_map; | |
68 | mutable std::mutex data_mtx; // protects Counters and map | |
69 | ||
70 | using DataGuard = std::lock_guard<decltype(data_mtx)>; | |
71 | ||
72 | // clean config | |
73 | ||
74 | std::deque<MarkPoint> clean_mark_points; | |
75 | Duration clean_age; // age at which ServerInfo cleaned | |
76 | ||
77 | // NB: All threads declared at end, so they're destructed firs! | |
78 | ||
79 | std::unique_ptr<RunEvery> cleaning_job; | |
80 | ||
81 | ||
82 | public: | |
83 | ||
84 | // we have to start the counters at 1, as 0 is used in the | |
85 | // cleaning process | |
86 | template<typename Rep, typename Per> | |
87 | ServiceTracker(std::chrono::duration<Rep,Per> _clean_every, | |
88 | std::chrono::duration<Rep,Per> _clean_age) : | |
89 | delta_counter(1), | |
90 | rho_counter(1), | |
91 | clean_age(std::chrono::duration_cast<Duration>(_clean_age)) | |
92 | { | |
93 | cleaning_job = | |
94 | std::unique_ptr<RunEvery>( | |
95 | new RunEvery(_clean_every, | |
96 | std::bind(&ServiceTracker::do_clean, this))); | |
97 | } | |
98 | ||
99 | ||
100 | // the reason we're overloading the constructor rather than | |
101 | // using default values for the arguments is so that callers | |
102 | // have to either use all defaults or specify all timings; with | |
103 | // default arguments they could specify some without others | |
104 | ServiceTracker() : | |
105 | ServiceTracker(std::chrono::minutes(5), std::chrono::minutes(10)) | |
106 | { | |
107 | // empty | |
108 | } | |
109 | ||
110 | ||
111 | /* | |
112 | * Incorporates the RespParams received into the various counter. | |
113 | */ | |
114 | void track_resp(const S& server_id, const PhaseType& phase) { | |
115 | DataGuard g(data_mtx); | |
116 | ||
117 | auto it = server_map.find(server_id); | |
118 | if (server_map.end() == it) { | |
119 | // this code can only run if a request did not precede the | |
120 | // response or if the record was cleaned up b/w when | |
121 | // the request was made and now | |
122 | ServerInfo si(delta_counter, rho_counter); | |
123 | si.resp_update(phase); | |
124 | server_map.emplace(server_id, si); | |
125 | } else { | |
126 | it->second.resp_update(phase); | |
127 | } | |
128 | ||
129 | ++delta_counter; | |
130 | if (PhaseType::reservation == phase) { | |
131 | ++rho_counter; | |
132 | } | |
133 | } | |
134 | ||
135 | ||
136 | /* | |
137 | * Returns the ReqParams for the given server. | |
138 | */ | |
139 | ReqParams get_req_params(const S& server) { | |
140 | DataGuard g(data_mtx); | |
141 | auto it = server_map.find(server); | |
142 | if (server_map.end() == it) { | |
143 | server_map.emplace(server, ServerInfo(delta_counter, rho_counter)); | |
144 | return ReqParams(1, 1); | |
145 | } else { | |
146 | Counter delta = | |
147 | 1 + delta_counter - it->second.delta_prev_req - it->second.my_delta; | |
148 | Counter rho = | |
149 | 1 + rho_counter - it->second.rho_prev_req - it->second.my_rho; | |
150 | ||
151 | it->second.req_update(delta_counter, rho_counter); | |
152 | ||
153 | return ReqParams(uint32_t(delta), uint32_t(rho)); | |
154 | } | |
155 | } | |
156 | ||
157 | private: | |
158 | ||
159 | /* | |
160 | * This is being called regularly by RunEvery. Every time it's | |
161 | * called it notes the time and delta counter (mark point) in a | |
162 | * deque. It also looks at the deque to find the most recent | |
163 | * mark point that is older than clean_age. It then walks the | |
164 | * map and delete all server entries that were last used before | |
165 | * that mark point. | |
166 | */ | |
167 | void do_clean() { | |
168 | TimePoint now = std::chrono::steady_clock::now(); | |
169 | DataGuard g(data_mtx); | |
170 | clean_mark_points.emplace_back(MarkPoint(now, delta_counter)); | |
171 | ||
172 | Counter earliest = 0; | |
173 | auto point = clean_mark_points.front(); | |
174 | while (point.first <= now - clean_age) { | |
175 | earliest = point.second; | |
176 | clean_mark_points.pop_front(); | |
177 | point = clean_mark_points.front(); | |
178 | } | |
179 | ||
180 | if (earliest > 0) { | |
181 | for (auto i = server_map.begin(); | |
182 | i != server_map.end(); | |
183 | /* empty */) { | |
184 | auto i2 = i++; | |
185 | if (i2->second.delta_prev_req <= earliest) { | |
186 | server_map.erase(i2); | |
187 | } | |
188 | } | |
189 | } | |
190 | } // do_clean | |
191 | }; // class ServiceTracker | |
192 | } | |
193 | } |