]> git.proxmox.com Git - ceph.git/blob - ceph/src/dmclock/sim/src/sim_client.h
update sources to v12.1.3
[ceph.git] / ceph / src / dmclock / sim / src / sim_client.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /*
5 * Copyright (C) 2016 Red Hat Inc.
6 */
7
8
9 #pragma once
10
11
12 #include <atomic>
13 #include <mutex>
14 #include <condition_variable>
15 #include <thread>
16 #include <chrono>
17 #include <vector>
18 #include <deque>
19 #include <iostream>
20
21 #include "sim_recs.h"
22
23
24 namespace crimson {
25 namespace qos_simulation {
26
27 struct req_op_t {};
28 struct wait_op_t {};
29 constexpr struct req_op_t req_op {};
30 constexpr struct wait_op_t wait_op {};
31
32
33 enum class CliOp { req, wait };
34 struct CliInst {
35 CliOp op;
36 union {
37 std::chrono::milliseconds wait_time;
38 struct {
39 uint32_t count;
40 std::chrono::microseconds time_bw_reqs;
41 uint16_t max_outstanding;
42 } req_params;
43 } args;
44
45 // D is a duration type
46 template<typename D>
47 CliInst(wait_op_t, D duration) :
48 op(CliOp::wait)
49 {
50 args.wait_time =
51 std::chrono::duration_cast<std::chrono::milliseconds>(duration);
52 }
53
54 CliInst(req_op_t,
55 uint32_t count, double ops_per_sec, uint16_t max_outstanding) :
56 op(CliOp::req)
57 {
58 args.req_params.count = count;
59 args.req_params.max_outstanding = max_outstanding;
60 uint32_t us = uint32_t(0.5 + 1.0 / ops_per_sec * 1000000);
61 args.req_params.time_bw_reqs = std::chrono::microseconds(us);
62 }
63 };
64
65
66 using ServerSelectFunc = std::function<const ServerId&(uint64_t seed)>;
67
68
69 template<typename SvcTrk, typename ReqPm, typename RespPm, typename Accum>
70 class SimulatedClient {
71 public:
72
73 struct InternalStats {
74 std::mutex mtx;
75 std::chrono::nanoseconds track_resp_time;
76 std::chrono::nanoseconds get_req_params_time;
77 uint32_t track_resp_count;
78 uint32_t get_req_params_count;
79
80 InternalStats() :
81 track_resp_time(0),
82 get_req_params_time(0),
83 track_resp_count(0),
84 get_req_params_count(0)
85 {
86 // empty
87 }
88 };
89
90 using SubmitFunc =
91 std::function<void(const ServerId&,
92 TestRequest&&,
93 const ClientId&,
94 const ReqPm&)>;
95
96 using ClientAccumFunc = std::function<void(Accum&,const RespPm&)>;
97
98 typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint;
99
100 static TimePoint now() { return std::chrono::steady_clock::now(); }
101
102 protected:
103
104 struct RespQueueItem {
105 TestResponse response;
106 ServerId server_id;
107 RespPm resp_params;
108 };
109
110 const ClientId id;
111 const SubmitFunc submit_f;
112 const ServerSelectFunc server_select_f;
113 const ClientAccumFunc accum_f;
114
115 std::vector<CliInst> instructions;
116
117 SvcTrk service_tracker;
118
119 // TODO: use lock rather than atomic???
120 std::atomic_ulong outstanding_ops;
121 std::atomic_bool requests_complete;
122
123 std::deque<RespQueueItem> resp_queue;
124
125 std::mutex mtx_req;
126 std::condition_variable cv_req;
127
128 std::mutex mtx_resp;
129 std::condition_variable cv_resp;
130
131 using RespGuard = std::lock_guard<decltype(mtx_resp)>;
132 using Lock = std::unique_lock<std::mutex>;
133
134 // data collection
135
136 std::vector<TimePoint> op_times;
137 Accum accumulator;
138 InternalStats internal_stats;
139
140 std::thread thd_req;
141 std::thread thd_resp;
142
143 public:
144
145 SimulatedClient(ClientId _id,
146 const SubmitFunc& _submit_f,
147 const ServerSelectFunc& _server_select_f,
148 const ClientAccumFunc& _accum_f,
149 const std::vector<CliInst>& _instrs) :
150 id(_id),
151 submit_f(_submit_f),
152 server_select_f(_server_select_f),
153 accum_f(_accum_f),
154 instructions(_instrs),
155 service_tracker(),
156 outstanding_ops(0),
157 requests_complete(false)
158 {
159 size_t op_count = 0;
160 for (auto i : instructions) {
161 if (CliOp::req == i.op) {
162 op_count += i.args.req_params.count;
163 }
164 }
165 op_times.reserve(op_count);
166
167 thd_resp = std::thread(&SimulatedClient::run_resp, this);
168 thd_req = std::thread(&SimulatedClient::run_req, this);
169 }
170
171
172 SimulatedClient(ClientId _id,
173 const SubmitFunc& _submit_f,
174 const ServerSelectFunc& _server_select_f,
175 const ClientAccumFunc& _accum_f,
176 uint16_t _ops_to_run,
177 double _iops_goal,
178 uint16_t _outstanding_ops_allowed) :
179 SimulatedClient(_id,
180 _submit_f, _server_select_f, _accum_f,
181 {{req_op, _ops_to_run, _iops_goal, _outstanding_ops_allowed}})
182 {
183 // empty
184 }
185
186
187 SimulatedClient(const SimulatedClient&) = delete;
188 SimulatedClient(SimulatedClient&&) = delete;
189 SimulatedClient& operator=(const SimulatedClient&) = delete;
190 SimulatedClient& operator=(SimulatedClient&&) = delete;
191
192 virtual ~SimulatedClient() {
193 wait_until_done();
194 }
195
196 void receive_response(const TestResponse& resp,
197 const ServerId& server_id,
198 const RespPm& resp_params) {
199 RespGuard g(mtx_resp);
200 resp_queue.push_back(RespQueueItem{resp, server_id, resp_params});
201 cv_resp.notify_one();
202 }
203
204 const std::vector<TimePoint>& get_op_times() const { return op_times; }
205
206 void wait_until_done() {
207 if (thd_req.joinable()) thd_req.join();
208 if (thd_resp.joinable()) thd_resp.join();
209 }
210
211 const Accum& get_accumulator() const { return accumulator; }
212
213 const InternalStats& get_internal_stats() const { return internal_stats; }
214
215 protected:
216
217 void run_req() {
218 size_t ops_count = 0;
219 for (auto i : instructions) {
220 if (CliOp::wait == i.op) {
221 std::this_thread::sleep_for(i.args.wait_time);
222 } else if (CliOp::req == i.op) {
223 Lock l(mtx_req);
224 for (uint64_t o = 0; o < i.args.req_params.count; ++o) {
225 while (outstanding_ops >= i.args.req_params.max_outstanding) {
226 cv_req.wait(l);
227 }
228
229 l.unlock();
230 auto now = std::chrono::steady_clock::now();
231 const ServerId& server = server_select_f(o);
232
233 ReqPm rp =
234 time_stats_w_return<decltype(internal_stats.get_req_params_time),
235 ReqPm>(internal_stats.mtx,
236 internal_stats.get_req_params_time,
237 [&]() -> ReqPm {
238 return service_tracker.get_req_params(server);
239 });
240 count_stats(internal_stats.mtx,
241 internal_stats.get_req_params_count);
242
243 submit_f(server,
244 TestRequest{server, static_cast<uint32_t>(o), 12},
245 id, rp);
246 ++outstanding_ops;
247 l.lock(); // lock for return to top of loop
248
249 auto delay_time = now + i.args.req_params.time_bw_reqs;
250 while (std::chrono::steady_clock::now() < delay_time) {
251 cv_req.wait_until(l, delay_time);
252 } // while
253 } // for
254 ops_count += i.args.req_params.count;
255 } else {
256 assert(false);
257 }
258 } // for loop
259
260 requests_complete = true;
261
262 // all requests made, thread ends
263 }
264
265
266 void run_resp() {
267 std::chrono::milliseconds delay(1000);
268 int op = 0;
269
270 Lock l(mtx_resp);
271
272 // since the following code would otherwise be repeated (except for
273 // the call to notify_one) in the two loops below; let's avoid
274 // repetition and define it once.
275 const auto proc_resp = [this, &op, &l](const bool notify_req_cv) {
276 if (!resp_queue.empty()) {
277 RespQueueItem item = resp_queue.front();
278 resp_queue.pop_front();
279
280 l.unlock();
281
282 // data collection
283
284 op_times.push_back(now());
285 accum_f(accumulator, item.resp_params);
286
287 // processing
288
289 #if 0 // not needed
290 TestResponse& resp = item.response;
291 #endif
292
293 time_stats(internal_stats.mtx,
294 internal_stats.track_resp_time,
295 [&](){
296 service_tracker.track_resp(item.server_id, item.resp_params);
297 });
298 count_stats(internal_stats.mtx,
299 internal_stats.track_resp_count);
300
301 --outstanding_ops;
302 if (notify_req_cv) {
303 cv_req.notify_one();
304 }
305
306 l.lock();
307 }
308 };
309
310 while(!requests_complete.load()) {
311 while(resp_queue.empty() && !requests_complete.load()) {
312 cv_resp.wait_for(l, delay);
313 }
314 proc_resp(true);
315 }
316
317 while(outstanding_ops.load() > 0) {
318 while(resp_queue.empty() && outstanding_ops.load() > 0) {
319 cv_resp.wait_for(l, delay);
320 }
321 proc_resp(false); // don't call notify_one as all requests are complete
322 }
323
324 // all responses received, thread ends
325 }
326 }; // class SimulatedClient
327
328
329 }; // namespace qos_simulation
330 }; // namespace crimson