]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | /* | |
5 | * Copyright (C) 2016 Red Hat Inc. | |
11fdf7f2 TL |
6 | * |
7 | * Author: J. Eric Ivancich <ivancich@redhat.com> | |
8 | * | |
9 | * This is free software; you can redistribute it and/or modify it | |
10 | * under the terms of the GNU Lesser General Public License version | |
11 | * 2.1, as published by the Free Software Foundation. See file | |
12 | * COPYING. | |
7c673cae FG |
13 | */ |
14 | ||
15 | ||
16 | #pragma once | |
17 | ||
18 | ||
19 | #include <atomic> | |
20 | #include <mutex> | |
21 | #include <condition_variable> | |
22 | #include <thread> | |
23 | #include <chrono> | |
24 | #include <vector> | |
25 | #include <deque> | |
26 | #include <iostream> | |
27 | ||
28 | #include "sim_recs.h" | |
29 | ||
30 | ||
31 | namespace crimson { | |
32 | namespace qos_simulation { | |
33 | ||
34 | struct req_op_t {}; | |
35 | struct wait_op_t {}; | |
36 | constexpr struct req_op_t req_op {}; | |
37 | constexpr struct wait_op_t wait_op {}; | |
38 | ||
39 | ||
40 | enum class CliOp { req, wait }; | |
41 | struct CliInst { | |
42 | CliOp op; | |
43 | union { | |
44 | std::chrono::milliseconds wait_time; | |
45 | struct { | |
46 | uint32_t count; | |
47 | std::chrono::microseconds time_bw_reqs; | |
48 | uint16_t max_outstanding; | |
49 | } req_params; | |
50 | } args; | |
51 | ||
52 | // D is a duration type | |
53 | template<typename D> | |
54 | CliInst(wait_op_t, D duration) : | |
55 | op(CliOp::wait) | |
56 | { | |
57 | args.wait_time = | |
58 | std::chrono::duration_cast<std::chrono::milliseconds>(duration); | |
59 | } | |
60 | ||
61 | CliInst(req_op_t, | |
62 | uint32_t count, double ops_per_sec, uint16_t max_outstanding) : | |
63 | op(CliOp::req) | |
64 | { | |
65 | args.req_params.count = count; | |
66 | args.req_params.max_outstanding = max_outstanding; | |
67 | uint32_t us = uint32_t(0.5 + 1.0 / ops_per_sec * 1000000); | |
68 | args.req_params.time_bw_reqs = std::chrono::microseconds(us); | |
69 | } | |
70 | }; | |
71 | ||
72 | ||
73 | using ServerSelectFunc = std::function<const ServerId&(uint64_t seed)>; | |
74 | ||
75 | ||
76 | template<typename SvcTrk, typename ReqPm, typename RespPm, typename Accum> | |
77 | class SimulatedClient { | |
78 | public: | |
79 | ||
80 | struct InternalStats { | |
81 | std::mutex mtx; | |
82 | std::chrono::nanoseconds track_resp_time; | |
83 | std::chrono::nanoseconds get_req_params_time; | |
84 | uint32_t track_resp_count; | |
85 | uint32_t get_req_params_count; | |
86 | ||
87 | InternalStats() : | |
88 | track_resp_time(0), | |
89 | get_req_params_time(0), | |
90 | track_resp_count(0), | |
91 | get_req_params_count(0) | |
92 | { | |
93 | // empty | |
94 | } | |
95 | }; | |
96 | ||
97 | using SubmitFunc = | |
98 | std::function<void(const ServerId&, | |
d2e6a577 | 99 | TestRequest&&, |
7c673cae FG |
100 | const ClientId&, |
101 | const ReqPm&)>; | |
102 | ||
103 | using ClientAccumFunc = std::function<void(Accum&,const RespPm&)>; | |
104 | ||
105 | typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint; | |
106 | ||
107 | static TimePoint now() { return std::chrono::steady_clock::now(); } | |
108 | ||
109 | protected: | |
110 | ||
111 | struct RespQueueItem { | |
112 | TestResponse response; | |
113 | ServerId server_id; | |
114 | RespPm resp_params; | |
11fdf7f2 | 115 | Cost request_cost; |
7c673cae FG |
116 | }; |
117 | ||
118 | const ClientId id; | |
119 | const SubmitFunc submit_f; | |
120 | const ServerSelectFunc server_select_f; | |
121 | const ClientAccumFunc accum_f; | |
122 | ||
123 | std::vector<CliInst> instructions; | |
124 | ||
125 | SvcTrk service_tracker; | |
126 | ||
127 | // TODO: use lock rather than atomic??? | |
128 | std::atomic_ulong outstanding_ops; | |
129 | std::atomic_bool requests_complete; | |
130 | ||
131 | std::deque<RespQueueItem> resp_queue; | |
132 | ||
133 | std::mutex mtx_req; | |
134 | std::condition_variable cv_req; | |
135 | ||
136 | std::mutex mtx_resp; | |
137 | std::condition_variable cv_resp; | |
138 | ||
139 | using RespGuard = std::lock_guard<decltype(mtx_resp)>; | |
140 | using Lock = std::unique_lock<std::mutex>; | |
141 | ||
142 | // data collection | |
143 | ||
144 | std::vector<TimePoint> op_times; | |
145 | Accum accumulator; | |
146 | InternalStats internal_stats; | |
147 | ||
148 | std::thread thd_req; | |
149 | std::thread thd_resp; | |
150 | ||
151 | public: | |
152 | ||
153 | SimulatedClient(ClientId _id, | |
154 | const SubmitFunc& _submit_f, | |
155 | const ServerSelectFunc& _server_select_f, | |
156 | const ClientAccumFunc& _accum_f, | |
157 | const std::vector<CliInst>& _instrs) : | |
158 | id(_id), | |
159 | submit_f(_submit_f), | |
160 | server_select_f(_server_select_f), | |
161 | accum_f(_accum_f), | |
162 | instructions(_instrs), | |
163 | service_tracker(), | |
164 | outstanding_ops(0), | |
165 | requests_complete(false) | |
166 | { | |
167 | size_t op_count = 0; | |
168 | for (auto i : instructions) { | |
169 | if (CliOp::req == i.op) { | |
170 | op_count += i.args.req_params.count; | |
171 | } | |
172 | } | |
173 | op_times.reserve(op_count); | |
174 | ||
175 | thd_resp = std::thread(&SimulatedClient::run_resp, this); | |
176 | thd_req = std::thread(&SimulatedClient::run_req, this); | |
177 | } | |
178 | ||
179 | ||
180 | SimulatedClient(ClientId _id, | |
181 | const SubmitFunc& _submit_f, | |
182 | const ServerSelectFunc& _server_select_f, | |
183 | const ClientAccumFunc& _accum_f, | |
184 | uint16_t _ops_to_run, | |
185 | double _iops_goal, | |
186 | uint16_t _outstanding_ops_allowed) : | |
187 | SimulatedClient(_id, | |
188 | _submit_f, _server_select_f, _accum_f, | |
189 | {{req_op, _ops_to_run, _iops_goal, _outstanding_ops_allowed}}) | |
190 | { | |
191 | // empty | |
192 | } | |
193 | ||
194 | ||
195 | SimulatedClient(const SimulatedClient&) = delete; | |
196 | SimulatedClient(SimulatedClient&&) = delete; | |
197 | SimulatedClient& operator=(const SimulatedClient&) = delete; | |
198 | SimulatedClient& operator=(SimulatedClient&&) = delete; | |
199 | ||
200 | virtual ~SimulatedClient() { | |
201 | wait_until_done(); | |
202 | } | |
203 | ||
204 | void receive_response(const TestResponse& resp, | |
205 | const ServerId& server_id, | |
11fdf7f2 TL |
206 | const RespPm& resp_params, |
207 | const Cost request_cost) { | |
7c673cae | 208 | RespGuard g(mtx_resp); |
11fdf7f2 TL |
209 | resp_queue.push_back( |
210 | RespQueueItem{ resp, server_id, resp_params, request_cost }); | |
7c673cae FG |
211 | cv_resp.notify_one(); |
212 | } | |
213 | ||
214 | const std::vector<TimePoint>& get_op_times() const { return op_times; } | |
215 | ||
216 | void wait_until_done() { | |
217 | if (thd_req.joinable()) thd_req.join(); | |
218 | if (thd_resp.joinable()) thd_resp.join(); | |
219 | } | |
220 | ||
221 | const Accum& get_accumulator() const { return accumulator; } | |
222 | ||
223 | const InternalStats& get_internal_stats() const { return internal_stats; } | |
224 | ||
225 | protected: | |
226 | ||
227 | void run_req() { | |
228 | size_t ops_count = 0; | |
229 | for (auto i : instructions) { | |
230 | if (CliOp::wait == i.op) { | |
231 | std::this_thread::sleep_for(i.args.wait_time); | |
232 | } else if (CliOp::req == i.op) { | |
233 | Lock l(mtx_req); | |
234 | for (uint64_t o = 0; o < i.args.req_params.count; ++o) { | |
235 | while (outstanding_ops >= i.args.req_params.max_outstanding) { | |
236 | cv_req.wait(l); | |
237 | } | |
238 | ||
239 | l.unlock(); | |
240 | auto now = std::chrono::steady_clock::now(); | |
241 | const ServerId& server = server_select_f(o); | |
242 | ||
243 | ReqPm rp = | |
244 | time_stats_w_return<decltype(internal_stats.get_req_params_time), | |
245 | ReqPm>(internal_stats.mtx, | |
246 | internal_stats.get_req_params_time, | |
247 | [&]() -> ReqPm { | |
248 | return service_tracker.get_req_params(server); | |
249 | }); | |
250 | count_stats(internal_stats.mtx, | |
251 | internal_stats.get_req_params_count); | |
252 | ||
d2e6a577 FG |
253 | submit_f(server, |
254 | TestRequest{server, static_cast<uint32_t>(o), 12}, | |
255 | id, rp); | |
7c673cae FG |
256 | ++outstanding_ops; |
257 | l.lock(); // lock for return to top of loop | |
258 | ||
259 | auto delay_time = now + i.args.req_params.time_bw_reqs; | |
260 | while (std::chrono::steady_clock::now() < delay_time) { | |
261 | cv_req.wait_until(l, delay_time); | |
262 | } // while | |
263 | } // for | |
264 | ops_count += i.args.req_params.count; | |
265 | } else { | |
266 | assert(false); | |
267 | } | |
268 | } // for loop | |
269 | ||
270 | requests_complete = true; | |
271 | ||
272 | // all requests made, thread ends | |
273 | } | |
274 | ||
275 | ||
276 | void run_resp() { | |
277 | std::chrono::milliseconds delay(1000); | |
278 | int op = 0; | |
279 | ||
280 | Lock l(mtx_resp); | |
281 | ||
282 | // since the following code would otherwise be repeated (except for | |
283 | // the call to notify_one) in the two loops below; let's avoid | |
284 | // repetition and define it once. | |
285 | const auto proc_resp = [this, &op, &l](const bool notify_req_cv) { | |
286 | if (!resp_queue.empty()) { | |
287 | RespQueueItem item = resp_queue.front(); | |
288 | resp_queue.pop_front(); | |
289 | ||
290 | l.unlock(); | |
291 | ||
292 | // data collection | |
293 | ||
294 | op_times.push_back(now()); | |
295 | accum_f(accumulator, item.resp_params); | |
296 | ||
297 | // processing | |
298 | ||
299 | #if 0 // not needed | |
300 | TestResponse& resp = item.response; | |
301 | #endif | |
302 | ||
303 | time_stats(internal_stats.mtx, | |
304 | internal_stats.track_resp_time, | |
305 | [&](){ | |
11fdf7f2 | 306 | service_tracker.track_resp(item.server_id, item.resp_params, item.request_cost); |
7c673cae FG |
307 | }); |
308 | count_stats(internal_stats.mtx, | |
309 | internal_stats.track_resp_count); | |
310 | ||
311 | --outstanding_ops; | |
312 | if (notify_req_cv) { | |
313 | cv_req.notify_one(); | |
314 | } | |
315 | ||
316 | l.lock(); | |
317 | } | |
318 | }; | |
319 | ||
320 | while(!requests_complete.load()) { | |
321 | while(resp_queue.empty() && !requests_complete.load()) { | |
322 | cv_resp.wait_for(l, delay); | |
323 | } | |
324 | proc_resp(true); | |
325 | } | |
326 | ||
327 | while(outstanding_ops.load() > 0) { | |
328 | while(resp_queue.empty() && outstanding_ops.load() > 0) { | |
329 | cv_resp.wait_for(l, delay); | |
330 | } | |
331 | proc_resp(false); // don't call notify_one as all requests are complete | |
332 | } | |
333 | ||
334 | // all responses received, thread ends | |
335 | } | |
336 | }; // class SimulatedClient | |
337 | ||
338 | ||
339 | }; // namespace qos_simulation | |
340 | }; // namespace crimson |