1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 * Copyright (C) 2016 Red Hat Inc.
7 * Author: J. Eric Ivancich <ivancich@redhat.com>
9 * This is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU Lesser General Public License version
11 * 2.1, as published by the Free Software Foundation. See file
21 #include <condition_variable>
32 namespace qos_simulation
{
36 constexpr struct req_op_t req_op
{};
37 constexpr struct wait_op_t wait_op
{};
40 enum class CliOp
{ req
, wait
};
44 std::chrono::milliseconds wait_time
;
47 std::chrono::microseconds time_bw_reqs
;
48 uint16_t max_outstanding
;
52 // D is a duration type
54 CliInst(wait_op_t
, D duration
) :
58 std::chrono::duration_cast
<std::chrono::milliseconds
>(duration
);
62 uint32_t count
, double ops_per_sec
, uint16_t max_outstanding
) :
65 args
.req_params
.count
= count
;
66 args
.req_params
.max_outstanding
= max_outstanding
;
67 uint32_t us
= uint32_t(0.5 + 1.0 / ops_per_sec
* 1000000);
68 args
.req_params
.time_bw_reqs
= std::chrono::microseconds(us
);
73 using ServerSelectFunc
= std::function
<const ServerId
&(uint64_t seed
)>;
76 template<typename SvcTrk
, typename ReqPm
, typename RespPm
, typename Accum
>
77 class SimulatedClient
{
80 struct InternalStats
{
82 std::chrono::nanoseconds track_resp_time
;
83 std::chrono::nanoseconds get_req_params_time
;
84 uint32_t track_resp_count
;
85 uint32_t get_req_params_count
;
89 get_req_params_time(0),
91 get_req_params_count(0)
98 std::function
<void(const ServerId
&,
103 using ClientAccumFunc
= std::function
<void(Accum
&,const RespPm
&)>;
105 typedef std::chrono::time_point
<std::chrono::steady_clock
> TimePoint
;
107 static TimePoint
now() { return std::chrono::steady_clock::now(); }
111 struct RespQueueItem
{
112 TestResponse response
;
119 const SubmitFunc submit_f
;
120 const ServerSelectFunc server_select_f
;
121 const ClientAccumFunc accum_f
;
123 std::vector
<CliInst
> instructions
;
125 SvcTrk service_tracker
;
127 // TODO: use lock rather than atomic???
128 std::atomic_ulong outstanding_ops
;
129 std::atomic_bool requests_complete
;
131 std::deque
<RespQueueItem
> resp_queue
;
134 std::condition_variable cv_req
;
137 std::condition_variable cv_resp
;
139 using RespGuard
= std::lock_guard
<decltype(mtx_resp
)>;
140 using Lock
= std::unique_lock
<std::mutex
>;
144 std::vector
<TimePoint
> op_times
;
146 InternalStats internal_stats
;
149 std::thread thd_resp
;
153 SimulatedClient(ClientId _id
,
154 const SubmitFunc
& _submit_f
,
155 const ServerSelectFunc
& _server_select_f
,
156 const ClientAccumFunc
& _accum_f
,
157 const std::vector
<CliInst
>& _instrs
) :
160 server_select_f(_server_select_f
),
162 instructions(_instrs
),
165 requests_complete(false)
168 for (auto i
: instructions
) {
169 if (CliOp::req
== i
.op
) {
170 op_count
+= i
.args
.req_params
.count
;
173 op_times
.reserve(op_count
);
175 thd_resp
= std::thread(&SimulatedClient::run_resp
, this);
176 thd_req
= std::thread(&SimulatedClient::run_req
, this);
180 SimulatedClient(ClientId _id
,
181 const SubmitFunc
& _submit_f
,
182 const ServerSelectFunc
& _server_select_f
,
183 const ClientAccumFunc
& _accum_f
,
184 uint16_t _ops_to_run
,
186 uint16_t _outstanding_ops_allowed
) :
188 _submit_f
, _server_select_f
, _accum_f
,
189 {{req_op
, _ops_to_run
, _iops_goal
, _outstanding_ops_allowed
}})
195 SimulatedClient(const SimulatedClient
&) = delete;
196 SimulatedClient(SimulatedClient
&&) = delete;
197 SimulatedClient
& operator=(const SimulatedClient
&) = delete;
198 SimulatedClient
& operator=(SimulatedClient
&&) = delete;
200 virtual ~SimulatedClient() {
204 void receive_response(const TestResponse
& resp
,
205 const ServerId
& server_id
,
206 const RespPm
& resp_params
,
207 const Cost request_cost
) {
208 RespGuard
g(mtx_resp
);
209 resp_queue
.push_back(
210 RespQueueItem
{ resp
, server_id
, resp_params
, request_cost
});
211 cv_resp
.notify_one();
214 const std::vector
<TimePoint
>& get_op_times() const { return op_times
; }
216 void wait_until_done() {
217 if (thd_req
.joinable()) thd_req
.join();
218 if (thd_resp
.joinable()) thd_resp
.join();
221 const Accum
& get_accumulator() const { return accumulator
; }
223 const InternalStats
& get_internal_stats() const { return internal_stats
; }
228 size_t ops_count
= 0;
229 for (auto i
: instructions
) {
230 if (CliOp::wait
== i
.op
) {
231 std::this_thread::sleep_for(i
.args
.wait_time
);
232 } else if (CliOp::req
== i
.op
) {
234 for (uint64_t o
= 0; o
< i
.args
.req_params
.count
; ++o
) {
235 while (outstanding_ops
>= i
.args
.req_params
.max_outstanding
) {
240 auto now
= std::chrono::steady_clock::now();
241 const ServerId
& server
= server_select_f(o
);
244 time_stats_w_return
<decltype(internal_stats
.get_req_params_time
),
245 ReqPm
>(internal_stats
.mtx
,
246 internal_stats
.get_req_params_time
,
248 return service_tracker
.get_req_params(server
);
250 count_stats(internal_stats
.mtx
,
251 internal_stats
.get_req_params_count
);
254 TestRequest
{server
, static_cast<uint32_t>(o
), 12},
257 l
.lock(); // lock for return to top of loop
259 auto delay_time
= now
+ i
.args
.req_params
.time_bw_reqs
;
260 while (std::chrono::steady_clock::now() < delay_time
) {
261 cv_req
.wait_until(l
, delay_time
);
264 ops_count
+= i
.args
.req_params
.count
;
270 requests_complete
= true;
272 // all requests made, thread ends
277 std::chrono::milliseconds
delay(1000);
282 // since the following code would otherwise be repeated (except for
283 // the call to notify_one) in the two loops below; let's avoid
284 // repetition and define it once.
285 const auto proc_resp
= [this, &op
, &l
](const bool notify_req_cv
) {
286 if (!resp_queue
.empty()) {
287 RespQueueItem item
= resp_queue
.front();
288 resp_queue
.pop_front();
294 op_times
.push_back(now());
295 accum_f(accumulator
, item
.resp_params
);
300 TestResponse
& resp
= item
.response
;
303 time_stats(internal_stats
.mtx
,
304 internal_stats
.track_resp_time
,
306 service_tracker
.track_resp(item
.server_id
, item
.resp_params
, item
.request_cost
);
308 count_stats(internal_stats
.mtx
,
309 internal_stats
.track_resp_count
);
320 while(!requests_complete
.load()) {
321 while(resp_queue
.empty() && !requests_complete
.load()) {
322 cv_resp
.wait_for(l
, delay
);
327 while(outstanding_ops
.load() > 0) {
328 while(resp_queue
.empty() && outstanding_ops
.load() > 0) {
329 cv_resp
.wait_for(l
, delay
);
331 proc_resp(false); // don't call notify_one as all requests are complete
334 // all responses received, thread ends
336 }; // class SimulatedClient
339 }; // namespace qos_simulation
340 }; // namespace crimson