]> git.proxmox.com Git - ceph.git/blob - ceph/src/dmclock/sim/src/sim_client.h
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / dmclock / sim / src / sim_client.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /*
5 * Copyright (C) 2016 Red Hat Inc.
6 *
7 * Author: J. Eric Ivancich <ivancich@redhat.com>
8 *
9 * This is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU Lesser General Public License version
11 * 2.1, as published by the Free Software Foundation. See file
12 * COPYING.
13 */
14
15
16 #pragma once
17
18
19 #include <atomic>
20 #include <mutex>
21 #include <condition_variable>
22 #include <thread>
23 #include <chrono>
24 #include <vector>
25 #include <deque>
26 #include <iostream>
27
28 #include "sim_recs.h"
29
30
31 namespace crimson {
32 namespace qos_simulation {
33
34 struct req_op_t {};
35 struct wait_op_t {};
36 constexpr struct req_op_t req_op {};
37 constexpr struct wait_op_t wait_op {};
38
39
40 enum class CliOp { req, wait };
41 struct CliInst {
42 CliOp op;
43 union {
44 std::chrono::milliseconds wait_time;
45 struct {
46 uint32_t count;
47 std::chrono::microseconds time_bw_reqs;
48 uint16_t max_outstanding;
49 } req_params;
50 } args;
51
52 // D is a duration type
53 template<typename D>
54 CliInst(wait_op_t, D duration) :
55 op(CliOp::wait)
56 {
57 args.wait_time =
58 std::chrono::duration_cast<std::chrono::milliseconds>(duration);
59 }
60
61 CliInst(req_op_t,
62 uint32_t count, double ops_per_sec, uint16_t max_outstanding) :
63 op(CliOp::req)
64 {
65 args.req_params.count = count;
66 args.req_params.max_outstanding = max_outstanding;
67 uint32_t us = uint32_t(0.5 + 1.0 / ops_per_sec * 1000000);
68 args.req_params.time_bw_reqs = std::chrono::microseconds(us);
69 }
70 };
71
72
73 using ServerSelectFunc = std::function<const ServerId&(uint64_t seed)>;
74
75
76 template<typename SvcTrk, typename ReqPm, typename RespPm, typename Accum>
77 class SimulatedClient {
78 public:
79
80 struct InternalStats {
81 std::mutex mtx;
82 std::chrono::nanoseconds track_resp_time;
83 std::chrono::nanoseconds get_req_params_time;
84 uint32_t track_resp_count;
85 uint32_t get_req_params_count;
86
87 InternalStats() :
88 track_resp_time(0),
89 get_req_params_time(0),
90 track_resp_count(0),
91 get_req_params_count(0)
92 {
93 // empty
94 }
95 };
96
97 using SubmitFunc =
98 std::function<void(const ServerId&,
99 TestRequest&&,
100 const ClientId&,
101 const ReqPm&)>;
102
103 using ClientAccumFunc = std::function<void(Accum&,const RespPm&)>;
104
105 typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint;
106
107 static TimePoint now() { return std::chrono::steady_clock::now(); }
108
109 protected:
110
111 struct RespQueueItem {
112 TestResponse response;
113 ServerId server_id;
114 RespPm resp_params;
115 Cost request_cost;
116 };
117
118 const ClientId id;
119 const SubmitFunc submit_f;
120 const ServerSelectFunc server_select_f;
121 const ClientAccumFunc accum_f;
122
123 std::vector<CliInst> instructions;
124
125 SvcTrk service_tracker;
126
127 // TODO: use lock rather than atomic???
128 std::atomic_ulong outstanding_ops;
129 std::atomic_bool requests_complete;
130
131 std::deque<RespQueueItem> resp_queue;
132
133 std::mutex mtx_req;
134 std::condition_variable cv_req;
135
136 std::mutex mtx_resp;
137 std::condition_variable cv_resp;
138
139 using RespGuard = std::lock_guard<decltype(mtx_resp)>;
140 using Lock = std::unique_lock<std::mutex>;
141
142 // data collection
143
144 std::vector<TimePoint> op_times;
145 Accum accumulator;
146 InternalStats internal_stats;
147
148 std::thread thd_req;
149 std::thread thd_resp;
150
151 public:
152
153 SimulatedClient(ClientId _id,
154 const SubmitFunc& _submit_f,
155 const ServerSelectFunc& _server_select_f,
156 const ClientAccumFunc& _accum_f,
157 const std::vector<CliInst>& _instrs) :
158 id(_id),
159 submit_f(_submit_f),
160 server_select_f(_server_select_f),
161 accum_f(_accum_f),
162 instructions(_instrs),
163 service_tracker(),
164 outstanding_ops(0),
165 requests_complete(false)
166 {
167 size_t op_count = 0;
168 for (auto i : instructions) {
169 if (CliOp::req == i.op) {
170 op_count += i.args.req_params.count;
171 }
172 }
173 op_times.reserve(op_count);
174
175 thd_resp = std::thread(&SimulatedClient::run_resp, this);
176 thd_req = std::thread(&SimulatedClient::run_req, this);
177 }
178
179
180 SimulatedClient(ClientId _id,
181 const SubmitFunc& _submit_f,
182 const ServerSelectFunc& _server_select_f,
183 const ClientAccumFunc& _accum_f,
184 uint16_t _ops_to_run,
185 double _iops_goal,
186 uint16_t _outstanding_ops_allowed) :
187 SimulatedClient(_id,
188 _submit_f, _server_select_f, _accum_f,
189 {{req_op, _ops_to_run, _iops_goal, _outstanding_ops_allowed}})
190 {
191 // empty
192 }
193
194
195 SimulatedClient(const SimulatedClient&) = delete;
196 SimulatedClient(SimulatedClient&&) = delete;
197 SimulatedClient& operator=(const SimulatedClient&) = delete;
198 SimulatedClient& operator=(SimulatedClient&&) = delete;
199
200 virtual ~SimulatedClient() {
201 wait_until_done();
202 }
203
204 void receive_response(const TestResponse& resp,
205 const ServerId& server_id,
206 const RespPm& resp_params,
207 const Cost request_cost) {
208 RespGuard g(mtx_resp);
209 resp_queue.push_back(
210 RespQueueItem{ resp, server_id, resp_params, request_cost });
211 cv_resp.notify_one();
212 }
213
214 const std::vector<TimePoint>& get_op_times() const { return op_times; }
215
216 void wait_until_done() {
217 if (thd_req.joinable()) thd_req.join();
218 if (thd_resp.joinable()) thd_resp.join();
219 }
220
221 const Accum& get_accumulator() const { return accumulator; }
222
223 const InternalStats& get_internal_stats() const { return internal_stats; }
224
225 protected:
226
227 void run_req() {
228 size_t ops_count = 0;
229 for (auto i : instructions) {
230 if (CliOp::wait == i.op) {
231 std::this_thread::sleep_for(i.args.wait_time);
232 } else if (CliOp::req == i.op) {
233 Lock l(mtx_req);
234 for (uint64_t o = 0; o < i.args.req_params.count; ++o) {
235 while (outstanding_ops >= i.args.req_params.max_outstanding) {
236 cv_req.wait(l);
237 }
238
239 l.unlock();
240 auto now = std::chrono::steady_clock::now();
241 const ServerId& server = server_select_f(o);
242
243 ReqPm rp =
244 time_stats_w_return<decltype(internal_stats.get_req_params_time),
245 ReqPm>(internal_stats.mtx,
246 internal_stats.get_req_params_time,
247 [&]() -> ReqPm {
248 return service_tracker.get_req_params(server);
249 });
250 count_stats(internal_stats.mtx,
251 internal_stats.get_req_params_count);
252
253 submit_f(server,
254 TestRequest{server, static_cast<uint32_t>(o), 12},
255 id, rp);
256 ++outstanding_ops;
257 l.lock(); // lock for return to top of loop
258
259 auto delay_time = now + i.args.req_params.time_bw_reqs;
260 while (std::chrono::steady_clock::now() < delay_time) {
261 cv_req.wait_until(l, delay_time);
262 } // while
263 } // for
264 ops_count += i.args.req_params.count;
265 } else {
266 assert(false);
267 }
268 } // for loop
269
270 requests_complete = true;
271
272 // all requests made, thread ends
273 }
274
275
276 void run_resp() {
277 std::chrono::milliseconds delay(1000);
278 int op = 0;
279
280 Lock l(mtx_resp);
281
282 // since the following code would otherwise be repeated (except for
283 // the call to notify_one) in the two loops below; let's avoid
284 // repetition and define it once.
285 const auto proc_resp = [this, &op, &l](const bool notify_req_cv) {
286 if (!resp_queue.empty()) {
287 RespQueueItem item = resp_queue.front();
288 resp_queue.pop_front();
289
290 l.unlock();
291
292 // data collection
293
294 op_times.push_back(now());
295 accum_f(accumulator, item.resp_params);
296
297 // processing
298
299 #if 0 // not needed
300 TestResponse& resp = item.response;
301 #endif
302
303 time_stats(internal_stats.mtx,
304 internal_stats.track_resp_time,
305 [&](){
306 service_tracker.track_resp(item.server_id, item.resp_params, item.request_cost);
307 });
308 count_stats(internal_stats.mtx,
309 internal_stats.track_resp_count);
310
311 --outstanding_ops;
312 if (notify_req_cv) {
313 cv_req.notify_one();
314 }
315
316 l.lock();
317 }
318 };
319
320 while(!requests_complete.load()) {
321 while(resp_queue.empty() && !requests_complete.load()) {
322 cv_resp.wait_for(l, delay);
323 }
324 proc_resp(true);
325 }
326
327 while(outstanding_ops.load() > 0) {
328 while(resp_queue.empty() && outstanding_ops.load() > 0) {
329 cv_resp.wait_for(l, delay);
330 }
331 proc_resp(false); // don't call notify_one as all requests are complete
332 }
333
334 // all responses received, thread ends
335 }
336 }; // class SimulatedClient
337
338
339 }; // namespace qos_simulation
340 }; // namespace crimson