]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | /* | |
5 | * Copyright (C) 2016 Red Hat Inc. | |
6 | */ | |
7 | ||
8 | ||
9 | #pragma once | |
10 | ||
11 | ||
12 | #include <thread> | |
13 | #include <mutex> | |
14 | #include <condition_variable> | |
15 | #include <chrono> | |
16 | #include <deque> | |
17 | ||
18 | #include "sim_recs.h" | |
19 | ||
20 | ||
21 | namespace crimson { | |
22 | namespace qos_simulation { | |
23 | ||
24 | template<typename Q, typename ReqPm, typename RespPm, typename Accum> | |
25 | class SimulatedServer { | |
26 | ||
27 | struct QueueItem { | |
28 | ClientId client; | |
29 | std::unique_ptr<TestRequest> request; | |
30 | RespPm additional; | |
31 | ||
32 | QueueItem(const ClientId& _client, | |
33 | std::unique_ptr<TestRequest>&& _request, | |
34 | const RespPm& _additional) : | |
35 | client(_client), | |
36 | request(std::move(_request)), | |
37 | additional(_additional) | |
38 | { | |
39 | // empty | |
40 | } | |
41 | }; // QueueItem | |
42 | ||
43 | public: | |
44 | ||
45 | struct InternalStats { | |
46 | std::mutex mtx; | |
47 | std::chrono::nanoseconds add_request_time; | |
48 | std::chrono::nanoseconds request_complete_time; | |
49 | uint32_t add_request_count; | |
50 | uint32_t request_complete_count; | |
51 | ||
52 | InternalStats() : | |
53 | add_request_time(0), | |
54 | request_complete_time(0), | |
55 | add_request_count(0), | |
56 | request_complete_count(0) | |
57 | { | |
58 | // empty | |
59 | } | |
60 | }; | |
61 | ||
62 | using ClientRespFunc = std::function<void(ClientId, | |
63 | const TestResponse&, | |
64 | const ServerId&, | |
65 | const RespPm&)>; | |
66 | ||
67 | using ServerAccumFunc = std::function<void(Accum& accumulator, | |
68 | const RespPm& additional)>; | |
69 | ||
70 | protected: | |
71 | ||
72 | const ServerId id; | |
73 | Q* priority_queue; | |
74 | ClientRespFunc client_resp_f; | |
75 | int iops; | |
76 | size_t thread_pool_size; | |
77 | ||
78 | bool finishing; | |
79 | std::chrono::microseconds op_time; | |
80 | ||
81 | std::mutex inner_queue_mtx; | |
82 | std::condition_variable inner_queue_cv; | |
83 | std::deque<QueueItem> inner_queue; | |
84 | ||
85 | std::thread* threads; | |
86 | ||
87 | using InnerQGuard = std::lock_guard<decltype(inner_queue_mtx)>; | |
88 | using Lock = std::unique_lock<std::mutex>; | |
89 | ||
90 | // data collection | |
91 | ||
92 | ServerAccumFunc accum_f; | |
93 | Accum accumulator; | |
94 | ||
95 | InternalStats internal_stats; | |
96 | ||
97 | public: | |
98 | ||
99 | using CanHandleRequestFunc = std::function<bool(void)>; | |
100 | using HandleRequestFunc = | |
101 | std::function<void(const ClientId&,std::unique_ptr<TestRequest>,const RespPm&)>; | |
102 | using CreateQueueF = std::function<Q*(CanHandleRequestFunc,HandleRequestFunc)>; | |
103 | ||
104 | ||
105 | SimulatedServer(ServerId _id, | |
106 | int _iops, | |
107 | size_t _thread_pool_size, | |
108 | const ClientRespFunc& _client_resp_f, | |
109 | const ServerAccumFunc& _accum_f, | |
110 | CreateQueueF _create_queue_f) : | |
111 | id(_id), | |
112 | priority_queue(_create_queue_f(std::bind(&SimulatedServer::has_avail_thread, | |
113 | this), | |
114 | std::bind(&SimulatedServer::inner_post, | |
115 | this, | |
116 | std::placeholders::_1, | |
117 | std::placeholders::_2, | |
118 | std::placeholders::_3))), | |
119 | client_resp_f(_client_resp_f), | |
120 | iops(_iops), | |
121 | thread_pool_size(_thread_pool_size), | |
122 | finishing(false), | |
123 | accum_f(_accum_f) | |
124 | { | |
125 | op_time = | |
126 | std::chrono::microseconds((int) (0.5 + | |
127 | thread_pool_size * 1000000.0 / iops)); | |
128 | std::chrono::milliseconds delay(1000); | |
129 | threads = new std::thread[thread_pool_size]; | |
130 | for (size_t i = 0; i < thread_pool_size; ++i) { | |
131 | threads[i] = std::thread(&SimulatedServer::run, this, delay); | |
132 | } | |
133 | } | |
134 | ||
135 | virtual ~SimulatedServer() { | |
136 | Lock l(inner_queue_mtx); | |
137 | finishing = true; | |
138 | inner_queue_cv.notify_all(); | |
139 | l.unlock(); | |
140 | ||
141 | for (size_t i = 0; i < thread_pool_size; ++i) { | |
142 | threads[i].join(); | |
143 | } | |
144 | ||
145 | delete[] threads; | |
31f18b77 FG |
146 | |
147 | delete priority_queue; | |
7c673cae FG |
148 | } |
149 | ||
d2e6a577 | 150 | void post(TestRequest&& request, |
7c673cae FG |
151 | const ClientId& client_id, |
152 | const ReqPm& req_params) | |
153 | { | |
154 | time_stats(internal_stats.mtx, | |
155 | internal_stats.add_request_time, | |
156 | [&](){ | |
d2e6a577 FG |
157 | priority_queue->add_request(std::move(request), |
158 | client_id, req_params); | |
7c673cae FG |
159 | }); |
160 | count_stats(internal_stats.mtx, | |
161 | internal_stats.add_request_count); | |
162 | } | |
163 | ||
164 | bool has_avail_thread() { | |
165 | InnerQGuard g(inner_queue_mtx); | |
166 | return inner_queue.size() <= thread_pool_size; | |
167 | } | |
168 | ||
169 | const Accum& get_accumulator() const { return accumulator; } | |
170 | const Q& get_priority_queue() const { return *priority_queue; } | |
171 | const InternalStats& get_internal_stats() const { return internal_stats; } | |
172 | ||
173 | protected: | |
174 | ||
175 | void inner_post(const ClientId& client, | |
176 | std::unique_ptr<TestRequest> request, | |
177 | const RespPm& additional) { | |
178 | Lock l(inner_queue_mtx); | |
179 | assert(!finishing); | |
180 | accum_f(accumulator, additional); | |
181 | inner_queue.emplace_back(QueueItem(client, | |
182 | std::move(request), | |
183 | additional)); | |
184 | inner_queue_cv.notify_one(); | |
185 | } | |
186 | ||
187 | void run(std::chrono::milliseconds check_period) { | |
188 | Lock l(inner_queue_mtx); | |
189 | while(true) { | |
190 | while(inner_queue.empty() && !finishing) { | |
191 | inner_queue_cv.wait_for(l, check_period); | |
192 | } | |
193 | if (!inner_queue.empty()) { | |
194 | auto& front = inner_queue.front(); | |
195 | auto client = front.client; | |
196 | auto req = std::move(front.request); | |
197 | auto additional = front.additional; | |
198 | inner_queue.pop_front(); | |
199 | ||
200 | l.unlock(); | |
201 | ||
202 | // simulation operation by sleeping; then call function to | |
203 | // notify server of completion | |
204 | std::this_thread::sleep_for(op_time); | |
205 | ||
7c673cae FG |
206 | // TODO: rather than assuming this constructor exists, perhaps |
207 | // pass in a function that does this mapping? | |
d2e6a577 | 208 | client_resp_f(client, TestResponse{req->epoch}, id, additional); |
7c673cae FG |
209 | |
210 | time_stats(internal_stats.mtx, | |
211 | internal_stats.request_complete_time, | |
212 | [&](){ | |
213 | priority_queue->request_completed(); | |
214 | }); | |
215 | count_stats(internal_stats.mtx, | |
216 | internal_stats.request_complete_count); | |
217 | ||
218 | l.lock(); // in prep for next iteration of loop | |
219 | } else { | |
220 | break; | |
221 | } | |
222 | } | |
223 | } | |
224 | }; // class SimulatedServer | |
225 | ||
226 | }; // namespace qos_simulation | |
227 | }; // namespace crimson |