]> git.proxmox.com Git - ceph.git/blob - ceph/src/dmclock/sim/src/sim_client.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / dmclock / sim / src / sim_client.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /*
5 * Copyright (C) 2016 Red Hat Inc.
6 */
7
8
9 #pragma once
10
11
12 #include <atomic>
13 #include <mutex>
14 #include <condition_variable>
15 #include <thread>
16 #include <chrono>
17 #include <vector>
18 #include <deque>
19 #include <iostream>
20
21 #include "sim_recs.h"
22
23
24 namespace crimson {
25 namespace qos_simulation {
26
27 struct req_op_t {};
28 struct wait_op_t {};
29 constexpr struct req_op_t req_op {};
30 constexpr struct wait_op_t wait_op {};
31
32
33 enum class CliOp { req, wait };
34 struct CliInst {
35 CliOp op;
36 union {
37 std::chrono::milliseconds wait_time;
38 struct {
39 uint32_t count;
40 std::chrono::microseconds time_bw_reqs;
41 uint16_t max_outstanding;
42 } req_params;
43 } args;
44
45 // D is a duration type
46 template<typename D>
47 CliInst(wait_op_t, D duration) :
48 op(CliOp::wait)
49 {
50 args.wait_time =
51 std::chrono::duration_cast<std::chrono::milliseconds>(duration);
52 }
53
54 CliInst(req_op_t,
55 uint32_t count, double ops_per_sec, uint16_t max_outstanding) :
56 op(CliOp::req)
57 {
58 args.req_params.count = count;
59 args.req_params.max_outstanding = max_outstanding;
60 uint32_t us = uint32_t(0.5 + 1.0 / ops_per_sec * 1000000);
61 args.req_params.time_bw_reqs = std::chrono::microseconds(us);
62 }
63 };
64
65
66 using ServerSelectFunc = std::function<const ServerId&(uint64_t seed)>;
67
68
69 template<typename SvcTrk, typename ReqPm, typename RespPm, typename Accum>
70 class SimulatedClient {
71 public:
72
73 struct InternalStats {
74 std::mutex mtx;
75 std::chrono::nanoseconds track_resp_time;
76 std::chrono::nanoseconds get_req_params_time;
77 uint32_t track_resp_count;
78 uint32_t get_req_params_count;
79
80 InternalStats() :
81 track_resp_time(0),
82 get_req_params_time(0),
83 track_resp_count(0),
84 get_req_params_count(0)
85 {
86 // empty
87 }
88 };
89
90 using SubmitFunc =
91 std::function<void(const ServerId&,
92 const TestRequest&,
93 const ClientId&,
94 const ReqPm&)>;
95
96 using ClientAccumFunc = std::function<void(Accum&,const RespPm&)>;
97
98 typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint;
99
100 static TimePoint now() { return std::chrono::steady_clock::now(); }
101
102 protected:
103
104 struct RespQueueItem {
105 TestResponse response;
106 ServerId server_id;
107 RespPm resp_params;
108 };
109
110 const ClientId id;
111 const SubmitFunc submit_f;
112 const ServerSelectFunc server_select_f;
113 const ClientAccumFunc accum_f;
114
115 std::vector<CliInst> instructions;
116
117 SvcTrk service_tracker;
118
119 // TODO: use lock rather than atomic???
120 std::atomic_ulong outstanding_ops;
121 std::atomic_bool requests_complete;
122
123 std::deque<RespQueueItem> resp_queue;
124
125 std::mutex mtx_req;
126 std::condition_variable cv_req;
127
128 std::mutex mtx_resp;
129 std::condition_variable cv_resp;
130
131 using RespGuard = std::lock_guard<decltype(mtx_resp)>;
132 using Lock = std::unique_lock<std::mutex>;
133
134 // data collection
135
136 std::vector<TimePoint> op_times;
137 Accum accumulator;
138 InternalStats internal_stats;
139
140 std::thread thd_req;
141 std::thread thd_resp;
142
143 public:
144
145 SimulatedClient(ClientId _id,
146 const SubmitFunc& _submit_f,
147 const ServerSelectFunc& _server_select_f,
148 const ClientAccumFunc& _accum_f,
149 const std::vector<CliInst>& _instrs) :
150 id(_id),
151 submit_f(_submit_f),
152 server_select_f(_server_select_f),
153 accum_f(_accum_f),
154 instructions(_instrs),
155 service_tracker(),
156 outstanding_ops(0),
157 requests_complete(false)
158 {
159 size_t op_count = 0;
160 for (auto i : instructions) {
161 if (CliOp::req == i.op) {
162 op_count += i.args.req_params.count;
163 }
164 }
165 op_times.reserve(op_count);
166
167 thd_resp = std::thread(&SimulatedClient::run_resp, this);
168 thd_req = std::thread(&SimulatedClient::run_req, this);
169 }
170
171
172 SimulatedClient(ClientId _id,
173 const SubmitFunc& _submit_f,
174 const ServerSelectFunc& _server_select_f,
175 const ClientAccumFunc& _accum_f,
176 uint16_t _ops_to_run,
177 double _iops_goal,
178 uint16_t _outstanding_ops_allowed) :
179 SimulatedClient(_id,
180 _submit_f, _server_select_f, _accum_f,
181 {{req_op, _ops_to_run, _iops_goal, _outstanding_ops_allowed}})
182 {
183 // empty
184 }
185
186
187 SimulatedClient(const SimulatedClient&) = delete;
188 SimulatedClient(SimulatedClient&&) = delete;
189 SimulatedClient& operator=(const SimulatedClient&) = delete;
190 SimulatedClient& operator=(SimulatedClient&&) = delete;
191
192 virtual ~SimulatedClient() {
193 wait_until_done();
194 }
195
196 void receive_response(const TestResponse& resp,
197 const ServerId& server_id,
198 const RespPm& resp_params) {
199 RespGuard g(mtx_resp);
200 resp_queue.push_back(RespQueueItem{resp, server_id, resp_params});
201 cv_resp.notify_one();
202 }
203
204 const std::vector<TimePoint>& get_op_times() const { return op_times; }
205
206 void wait_until_done() {
207 if (thd_req.joinable()) thd_req.join();
208 if (thd_resp.joinable()) thd_resp.join();
209 }
210
211 const Accum& get_accumulator() const { return accumulator; }
212
213 const InternalStats& get_internal_stats() const { return internal_stats; }
214
215 protected:
216
217 void run_req() {
218 size_t ops_count = 0;
219 for (auto i : instructions) {
220 if (CliOp::wait == i.op) {
221 std::this_thread::sleep_for(i.args.wait_time);
222 } else if (CliOp::req == i.op) {
223 Lock l(mtx_req);
224 for (uint64_t o = 0; o < i.args.req_params.count; ++o) {
225 while (outstanding_ops >= i.args.req_params.max_outstanding) {
226 cv_req.wait(l);
227 }
228
229 l.unlock();
230 auto now = std::chrono::steady_clock::now();
231 const ServerId& server = server_select_f(o);
232
233 ReqPm rp =
234 time_stats_w_return<decltype(internal_stats.get_req_params_time),
235 ReqPm>(internal_stats.mtx,
236 internal_stats.get_req_params_time,
237 [&]() -> ReqPm {
238 return service_tracker.get_req_params(server);
239 });
240 count_stats(internal_stats.mtx,
241 internal_stats.get_req_params_count);
242
243 TestRequest req(server, o, 12);
244 submit_f(server, req, id, rp);
245 ++outstanding_ops;
246 l.lock(); // lock for return to top of loop
247
248 auto delay_time = now + i.args.req_params.time_bw_reqs;
249 while (std::chrono::steady_clock::now() < delay_time) {
250 cv_req.wait_until(l, delay_time);
251 } // while
252 } // for
253 ops_count += i.args.req_params.count;
254 } else {
255 assert(false);
256 }
257 } // for loop
258
259 requests_complete = true;
260
261 // all requests made, thread ends
262 }
263
264
265 void run_resp() {
266 std::chrono::milliseconds delay(1000);
267 int op = 0;
268
269 Lock l(mtx_resp);
270
271 // since the following code would otherwise be repeated (except for
272 // the call to notify_one) in the two loops below; let's avoid
273 // repetition and define it once.
274 const auto proc_resp = [this, &op, &l](const bool notify_req_cv) {
275 if (!resp_queue.empty()) {
276 RespQueueItem item = resp_queue.front();
277 resp_queue.pop_front();
278
279 l.unlock();
280
281 // data collection
282
283 op_times.push_back(now());
284 accum_f(accumulator, item.resp_params);
285
286 // processing
287
288 #if 0 // not needed
289 TestResponse& resp = item.response;
290 #endif
291
292 time_stats(internal_stats.mtx,
293 internal_stats.track_resp_time,
294 [&](){
295 service_tracker.track_resp(item.server_id, item.resp_params);
296 });
297 count_stats(internal_stats.mtx,
298 internal_stats.track_resp_count);
299
300 --outstanding_ops;
301 if (notify_req_cv) {
302 cv_req.notify_one();
303 }
304
305 l.lock();
306 }
307 };
308
309 while(!requests_complete.load()) {
310 while(resp_queue.empty() && !requests_complete.load()) {
311 cv_resp.wait_for(l, delay);
312 }
313 proc_resp(true);
314 }
315
316 while(outstanding_ops.load() > 0) {
317 while(resp_queue.empty() && outstanding_ops.load() > 0) {
318 cv_resp.wait_for(l, delay);
319 }
320 proc_resp(false); // don't call notify_one as all requests are complete
321 }
322
323 // all responses received, thread ends
324 }
325 }; // class SimulatedClient
326
327
328 }; // namespace qos_simulation
329 }; // namespace crimson