1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 * Copyright (C) 2016 Red Hat Inc.
14 #include <condition_variable>
25 namespace qos_simulation
{
29 constexpr struct req_op_t req_op
{};
30 constexpr struct wait_op_t wait_op
{};
33 enum class CliOp
{ req
, wait
};
37 std::chrono::milliseconds wait_time
;
40 std::chrono::microseconds time_bw_reqs
;
41 uint16_t max_outstanding
;
45 // D is a duration type
47 CliInst(wait_op_t
, D duration
) :
51 std::chrono::duration_cast
<std::chrono::milliseconds
>(duration
);
55 uint32_t count
, double ops_per_sec
, uint16_t max_outstanding
) :
58 args
.req_params
.count
= count
;
59 args
.req_params
.max_outstanding
= max_outstanding
;
60 uint32_t us
= uint32_t(0.5 + 1.0 / ops_per_sec
* 1000000);
61 args
.req_params
.time_bw_reqs
= std::chrono::microseconds(us
);
66 using ServerSelectFunc
= std::function
<const ServerId
&(uint64_t seed
)>;
69 template<typename SvcTrk
, typename ReqPm
, typename RespPm
, typename Accum
>
70 class SimulatedClient
{
73 struct InternalStats
{
75 std::chrono::nanoseconds track_resp_time
;
76 std::chrono::nanoseconds get_req_params_time
;
77 uint32_t track_resp_count
;
78 uint32_t get_req_params_count
;
82 get_req_params_time(0),
84 get_req_params_count(0)
91 std::function
<void(const ServerId
&,
96 using ClientAccumFunc
= std::function
<void(Accum
&,const RespPm
&)>;
98 typedef std::chrono::time_point
<std::chrono::steady_clock
> TimePoint
;
100 static TimePoint
now() { return std::chrono::steady_clock::now(); }
104 struct RespQueueItem
{
105 TestResponse response
;
111 const SubmitFunc submit_f
;
112 const ServerSelectFunc server_select_f
;
113 const ClientAccumFunc accum_f
;
115 std::vector
<CliInst
> instructions
;
117 SvcTrk service_tracker
;
119 // TODO: use lock rather than atomic???
120 std::atomic_ulong outstanding_ops
;
121 std::atomic_bool requests_complete
;
123 std::deque
<RespQueueItem
> resp_queue
;
126 std::condition_variable cv_req
;
129 std::condition_variable cv_resp
;
131 using RespGuard
= std::lock_guard
<decltype(mtx_resp
)>;
132 using Lock
= std::unique_lock
<std::mutex
>;
136 std::vector
<TimePoint
> op_times
;
138 InternalStats internal_stats
;
141 std::thread thd_resp
;
145 SimulatedClient(ClientId _id
,
146 const SubmitFunc
& _submit_f
,
147 const ServerSelectFunc
& _server_select_f
,
148 const ClientAccumFunc
& _accum_f
,
149 const std::vector
<CliInst
>& _instrs
) :
152 server_select_f(_server_select_f
),
154 instructions(_instrs
),
157 requests_complete(false)
160 for (auto i
: instructions
) {
161 if (CliOp::req
== i
.op
) {
162 op_count
+= i
.args
.req_params
.count
;
165 op_times
.reserve(op_count
);
167 thd_resp
= std::thread(&SimulatedClient::run_resp
, this);
168 thd_req
= std::thread(&SimulatedClient::run_req
, this);
172 SimulatedClient(ClientId _id
,
173 const SubmitFunc
& _submit_f
,
174 const ServerSelectFunc
& _server_select_f
,
175 const ClientAccumFunc
& _accum_f
,
176 uint16_t _ops_to_run
,
178 uint16_t _outstanding_ops_allowed
) :
180 _submit_f
, _server_select_f
, _accum_f
,
181 {{req_op
, _ops_to_run
, _iops_goal
, _outstanding_ops_allowed
}})
187 SimulatedClient(const SimulatedClient
&) = delete;
188 SimulatedClient(SimulatedClient
&&) = delete;
189 SimulatedClient
& operator=(const SimulatedClient
&) = delete;
190 SimulatedClient
& operator=(SimulatedClient
&&) = delete;
192 virtual ~SimulatedClient() {
196 void receive_response(const TestResponse
& resp
,
197 const ServerId
& server_id
,
198 const RespPm
& resp_params
) {
199 RespGuard
g(mtx_resp
);
200 resp_queue
.push_back(RespQueueItem
{resp
, server_id
, resp_params
});
201 cv_resp
.notify_one();
204 const std::vector
<TimePoint
>& get_op_times() const { return op_times
; }
206 void wait_until_done() {
207 if (thd_req
.joinable()) thd_req
.join();
208 if (thd_resp
.joinable()) thd_resp
.join();
211 const Accum
& get_accumulator() const { return accumulator
; }
213 const InternalStats
& get_internal_stats() const { return internal_stats
; }
218 size_t ops_count
= 0;
219 for (auto i
: instructions
) {
220 if (CliOp::wait
== i
.op
) {
221 std::this_thread::sleep_for(i
.args
.wait_time
);
222 } else if (CliOp::req
== i
.op
) {
224 for (uint64_t o
= 0; o
< i
.args
.req_params
.count
; ++o
) {
225 while (outstanding_ops
>= i
.args
.req_params
.max_outstanding
) {
230 auto now
= std::chrono::steady_clock::now();
231 const ServerId
& server
= server_select_f(o
);
234 time_stats_w_return
<decltype(internal_stats
.get_req_params_time
),
235 ReqPm
>(internal_stats
.mtx
,
236 internal_stats
.get_req_params_time
,
238 return service_tracker
.get_req_params(server
);
240 count_stats(internal_stats
.mtx
,
241 internal_stats
.get_req_params_count
);
243 TestRequest
req(server
, o
, 12);
244 submit_f(server
, req
, id
, rp
);
246 l
.lock(); // lock for return to top of loop
248 auto delay_time
= now
+ i
.args
.req_params
.time_bw_reqs
;
249 while (std::chrono::steady_clock::now() < delay_time
) {
250 cv_req
.wait_until(l
, delay_time
);
253 ops_count
+= i
.args
.req_params
.count
;
259 requests_complete
= true;
261 // all requests made, thread ends
266 std::chrono::milliseconds
delay(1000);
271 // since the following code would otherwise be repeated (except for
272 // the call to notify_one) in the two loops below; let's avoid
273 // repetition and define it once.
274 const auto proc_resp
= [this, &op
, &l
](const bool notify_req_cv
) {
275 if (!resp_queue
.empty()) {
276 RespQueueItem item
= resp_queue
.front();
277 resp_queue
.pop_front();
283 op_times
.push_back(now());
284 accum_f(accumulator
, item
.resp_params
);
289 TestResponse
& resp
= item
.response
;
292 time_stats(internal_stats
.mtx
,
293 internal_stats
.track_resp_time
,
295 service_tracker
.track_resp(item
.server_id
, item
.resp_params
);
297 count_stats(internal_stats
.mtx
,
298 internal_stats
.track_resp_count
);
309 while(!requests_complete
.load()) {
310 while(resp_queue
.empty() && !requests_complete
.load()) {
311 cv_resp
.wait_for(l
, delay
);
316 while(outstanding_ops
.load() > 0) {
317 while(resp_queue
.empty() && outstanding_ops
.load() > 0) {
318 cv_resp
.wait_for(l
, delay
);
320 proc_resp(false); // don't call notify_one as all requests are complete
323 // all responses received, thread ends
325 }; // class SimulatedClient
328 }; // namespace qos_simulation
329 }; // namespace crimson