]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | /* | |
5 | * Copyright (C) 2016 Red Hat Inc. | |
6 | */ | |
7 | ||
8 | ||
9 | #pragma once | |
10 | ||
11 | ||
12 | #include <atomic> | |
13 | #include <mutex> | |
14 | #include <condition_variable> | |
15 | #include <thread> | |
16 | #include <chrono> | |
17 | #include <vector> | |
18 | #include <deque> | |
19 | #include <iostream> | |
20 | ||
21 | #include "sim_recs.h" | |
22 | ||
23 | ||
24 | namespace crimson { | |
25 | namespace qos_simulation { | |
26 | ||
27 | struct req_op_t {}; | |
28 | struct wait_op_t {}; | |
29 | constexpr struct req_op_t req_op {}; | |
30 | constexpr struct wait_op_t wait_op {}; | |
31 | ||
32 | ||
33 | enum class CliOp { req, wait }; | |
34 | struct CliInst { | |
35 | CliOp op; | |
36 | union { | |
37 | std::chrono::milliseconds wait_time; | |
38 | struct { | |
39 | uint32_t count; | |
40 | std::chrono::microseconds time_bw_reqs; | |
41 | uint16_t max_outstanding; | |
42 | } req_params; | |
43 | } args; | |
44 | ||
45 | // D is a duration type | |
46 | template<typename D> | |
47 | CliInst(wait_op_t, D duration) : | |
48 | op(CliOp::wait) | |
49 | { | |
50 | args.wait_time = | |
51 | std::chrono::duration_cast<std::chrono::milliseconds>(duration); | |
52 | } | |
53 | ||
54 | CliInst(req_op_t, | |
55 | uint32_t count, double ops_per_sec, uint16_t max_outstanding) : | |
56 | op(CliOp::req) | |
57 | { | |
58 | args.req_params.count = count; | |
59 | args.req_params.max_outstanding = max_outstanding; | |
60 | uint32_t us = uint32_t(0.5 + 1.0 / ops_per_sec * 1000000); | |
61 | args.req_params.time_bw_reqs = std::chrono::microseconds(us); | |
62 | } | |
63 | }; | |
64 | ||
65 | ||
66 | using ServerSelectFunc = std::function<const ServerId&(uint64_t seed)>; | |
67 | ||
68 | ||
69 | template<typename SvcTrk, typename ReqPm, typename RespPm, typename Accum> | |
70 | class SimulatedClient { | |
71 | public: | |
72 | ||
73 | struct InternalStats { | |
74 | std::mutex mtx; | |
75 | std::chrono::nanoseconds track_resp_time; | |
76 | std::chrono::nanoseconds get_req_params_time; | |
77 | uint32_t track_resp_count; | |
78 | uint32_t get_req_params_count; | |
79 | ||
80 | InternalStats() : | |
81 | track_resp_time(0), | |
82 | get_req_params_time(0), | |
83 | track_resp_count(0), | |
84 | get_req_params_count(0) | |
85 | { | |
86 | // empty | |
87 | } | |
88 | }; | |
89 | ||
90 | using SubmitFunc = | |
91 | std::function<void(const ServerId&, | |
92 | const TestRequest&, | |
93 | const ClientId&, | |
94 | const ReqPm&)>; | |
95 | ||
96 | using ClientAccumFunc = std::function<void(Accum&,const RespPm&)>; | |
97 | ||
98 | typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint; | |
99 | ||
100 | static TimePoint now() { return std::chrono::steady_clock::now(); } | |
101 | ||
102 | protected: | |
103 | ||
104 | struct RespQueueItem { | |
105 | TestResponse response; | |
106 | ServerId server_id; | |
107 | RespPm resp_params; | |
108 | }; | |
109 | ||
110 | const ClientId id; | |
111 | const SubmitFunc submit_f; | |
112 | const ServerSelectFunc server_select_f; | |
113 | const ClientAccumFunc accum_f; | |
114 | ||
115 | std::vector<CliInst> instructions; | |
116 | ||
117 | SvcTrk service_tracker; | |
118 | ||
119 | // TODO: use lock rather than atomic??? | |
120 | std::atomic_ulong outstanding_ops; | |
121 | std::atomic_bool requests_complete; | |
122 | ||
123 | std::deque<RespQueueItem> resp_queue; | |
124 | ||
125 | std::mutex mtx_req; | |
126 | std::condition_variable cv_req; | |
127 | ||
128 | std::mutex mtx_resp; | |
129 | std::condition_variable cv_resp; | |
130 | ||
131 | using RespGuard = std::lock_guard<decltype(mtx_resp)>; | |
132 | using Lock = std::unique_lock<std::mutex>; | |
133 | ||
134 | // data collection | |
135 | ||
136 | std::vector<TimePoint> op_times; | |
137 | Accum accumulator; | |
138 | InternalStats internal_stats; | |
139 | ||
140 | std::thread thd_req; | |
141 | std::thread thd_resp; | |
142 | ||
143 | public: | |
144 | ||
145 | SimulatedClient(ClientId _id, | |
146 | const SubmitFunc& _submit_f, | |
147 | const ServerSelectFunc& _server_select_f, | |
148 | const ClientAccumFunc& _accum_f, | |
149 | const std::vector<CliInst>& _instrs) : | |
150 | id(_id), | |
151 | submit_f(_submit_f), | |
152 | server_select_f(_server_select_f), | |
153 | accum_f(_accum_f), | |
154 | instructions(_instrs), | |
155 | service_tracker(), | |
156 | outstanding_ops(0), | |
157 | requests_complete(false) | |
158 | { | |
159 | size_t op_count = 0; | |
160 | for (auto i : instructions) { | |
161 | if (CliOp::req == i.op) { | |
162 | op_count += i.args.req_params.count; | |
163 | } | |
164 | } | |
165 | op_times.reserve(op_count); | |
166 | ||
167 | thd_resp = std::thread(&SimulatedClient::run_resp, this); | |
168 | thd_req = std::thread(&SimulatedClient::run_req, this); | |
169 | } | |
170 | ||
171 | ||
172 | SimulatedClient(ClientId _id, | |
173 | const SubmitFunc& _submit_f, | |
174 | const ServerSelectFunc& _server_select_f, | |
175 | const ClientAccumFunc& _accum_f, | |
176 | uint16_t _ops_to_run, | |
177 | double _iops_goal, | |
178 | uint16_t _outstanding_ops_allowed) : | |
179 | SimulatedClient(_id, | |
180 | _submit_f, _server_select_f, _accum_f, | |
181 | {{req_op, _ops_to_run, _iops_goal, _outstanding_ops_allowed}}) | |
182 | { | |
183 | // empty | |
184 | } | |
185 | ||
186 | ||
187 | SimulatedClient(const SimulatedClient&) = delete; | |
188 | SimulatedClient(SimulatedClient&&) = delete; | |
189 | SimulatedClient& operator=(const SimulatedClient&) = delete; | |
190 | SimulatedClient& operator=(SimulatedClient&&) = delete; | |
191 | ||
192 | virtual ~SimulatedClient() { | |
193 | wait_until_done(); | |
194 | } | |
195 | ||
196 | void receive_response(const TestResponse& resp, | |
197 | const ServerId& server_id, | |
198 | const RespPm& resp_params) { | |
199 | RespGuard g(mtx_resp); | |
200 | resp_queue.push_back(RespQueueItem{resp, server_id, resp_params}); | |
201 | cv_resp.notify_one(); | |
202 | } | |
203 | ||
204 | const std::vector<TimePoint>& get_op_times() const { return op_times; } | |
205 | ||
206 | void wait_until_done() { | |
207 | if (thd_req.joinable()) thd_req.join(); | |
208 | if (thd_resp.joinable()) thd_resp.join(); | |
209 | } | |
210 | ||
211 | const Accum& get_accumulator() const { return accumulator; } | |
212 | ||
213 | const InternalStats& get_internal_stats() const { return internal_stats; } | |
214 | ||
215 | protected: | |
216 | ||
217 | void run_req() { | |
218 | size_t ops_count = 0; | |
219 | for (auto i : instructions) { | |
220 | if (CliOp::wait == i.op) { | |
221 | std::this_thread::sleep_for(i.args.wait_time); | |
222 | } else if (CliOp::req == i.op) { | |
223 | Lock l(mtx_req); | |
224 | for (uint64_t o = 0; o < i.args.req_params.count; ++o) { | |
225 | while (outstanding_ops >= i.args.req_params.max_outstanding) { | |
226 | cv_req.wait(l); | |
227 | } | |
228 | ||
229 | l.unlock(); | |
230 | auto now = std::chrono::steady_clock::now(); | |
231 | const ServerId& server = server_select_f(o); | |
232 | ||
233 | ReqPm rp = | |
234 | time_stats_w_return<decltype(internal_stats.get_req_params_time), | |
235 | ReqPm>(internal_stats.mtx, | |
236 | internal_stats.get_req_params_time, | |
237 | [&]() -> ReqPm { | |
238 | return service_tracker.get_req_params(server); | |
239 | }); | |
240 | count_stats(internal_stats.mtx, | |
241 | internal_stats.get_req_params_count); | |
242 | ||
243 | TestRequest req(server, o, 12); | |
244 | submit_f(server, req, id, rp); | |
245 | ++outstanding_ops; | |
246 | l.lock(); // lock for return to top of loop | |
247 | ||
248 | auto delay_time = now + i.args.req_params.time_bw_reqs; | |
249 | while (std::chrono::steady_clock::now() < delay_time) { | |
250 | cv_req.wait_until(l, delay_time); | |
251 | } // while | |
252 | } // for | |
253 | ops_count += i.args.req_params.count; | |
254 | } else { | |
255 | assert(false); | |
256 | } | |
257 | } // for loop | |
258 | ||
259 | requests_complete = true; | |
260 | ||
261 | // all requests made, thread ends | |
262 | } | |
263 | ||
264 | ||
265 | void run_resp() { | |
266 | std::chrono::milliseconds delay(1000); | |
267 | int op = 0; | |
268 | ||
269 | Lock l(mtx_resp); | |
270 | ||
271 | // since the following code would otherwise be repeated (except for | |
272 | // the call to notify_one) in the two loops below; let's avoid | |
273 | // repetition and define it once. | |
274 | const auto proc_resp = [this, &op, &l](const bool notify_req_cv) { | |
275 | if (!resp_queue.empty()) { | |
276 | RespQueueItem item = resp_queue.front(); | |
277 | resp_queue.pop_front(); | |
278 | ||
279 | l.unlock(); | |
280 | ||
281 | // data collection | |
282 | ||
283 | op_times.push_back(now()); | |
284 | accum_f(accumulator, item.resp_params); | |
285 | ||
286 | // processing | |
287 | ||
288 | #if 0 // not needed | |
289 | TestResponse& resp = item.response; | |
290 | #endif | |
291 | ||
292 | time_stats(internal_stats.mtx, | |
293 | internal_stats.track_resp_time, | |
294 | [&](){ | |
295 | service_tracker.track_resp(item.server_id, item.resp_params); | |
296 | }); | |
297 | count_stats(internal_stats.mtx, | |
298 | internal_stats.track_resp_count); | |
299 | ||
300 | --outstanding_ops; | |
301 | if (notify_req_cv) { | |
302 | cv_req.notify_one(); | |
303 | } | |
304 | ||
305 | l.lock(); | |
306 | } | |
307 | }; | |
308 | ||
309 | while(!requests_complete.load()) { | |
310 | while(resp_queue.empty() && !requests_complete.load()) { | |
311 | cv_resp.wait_for(l, delay); | |
312 | } | |
313 | proc_resp(true); | |
314 | } | |
315 | ||
316 | while(outstanding_ops.load() > 0) { | |
317 | while(resp_queue.empty() && outstanding_ops.load() > 0) { | |
318 | cv_resp.wait_for(l, delay); | |
319 | } | |
320 | proc_resp(false); // don't call notify_one as all requests are complete | |
321 | } | |
322 | ||
323 | // all responses received, thread ends | |
324 | } | |
325 | }; // class SimulatedClient | |
326 | ||
327 | ||
328 | }; // namespace qos_simulation | |
329 | }; // namespace crimson |