1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 * Copyright (C) 2016 Red Hat Inc.
7 * Author: J. Eric Ivancich <ivancich@redhat.com>
9 * This is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU Lesser General Public License version
11 * 2.1, as published by the Free Software Foundation. See file
21 #include <condition_variable>
29 namespace qos_simulation
{
31 template<typename Q
, typename ReqPm
, typename RespPm
, typename Accum
>
32 class SimulatedServer
{
36 std::unique_ptr
<TestRequest
> request
;
40 QueueItem(const ClientId
& _client
,
41 std::unique_ptr
<TestRequest
>&& _request
,
42 const RespPm
& _additional
,
43 const Cost _request_cost
) :
45 request(std::move(_request
)),
46 additional(_additional
),
47 request_cost(_request_cost
)
55 struct InternalStats
{
57 std::chrono::nanoseconds add_request_time
;
58 std::chrono::nanoseconds request_complete_time
;
59 uint32_t add_request_count
;
60 uint32_t request_complete_count
;
64 request_complete_time(0),
66 request_complete_count(0)
72 using ClientRespFunc
= std::function
<void(ClientId
,
78 using ServerAccumFunc
= std::function
<void(Accum
& accumulator
,
79 const RespPm
& additional
)>;
85 ClientRespFunc client_resp_f
;
87 size_t thread_pool_size
;
90 std::chrono::microseconds op_time
;
92 std::mutex inner_queue_mtx
;
93 std::condition_variable inner_queue_cv
;
94 std::deque
<QueueItem
> inner_queue
;
98 using InnerQGuard
= std::lock_guard
<decltype(inner_queue_mtx
)>;
99 using Lock
= std::unique_lock
<std::mutex
>;
103 ServerAccumFunc accum_f
;
106 InternalStats internal_stats
;
110 using CanHandleRequestFunc
= std::function
<bool(void)>;
111 using HandleRequestFunc
=
112 std::function
<void(const ClientId
&,std::unique_ptr
<TestRequest
>,const RespPm
&, uint64_t)>;
113 using CreateQueueF
= std::function
<Q
*(CanHandleRequestFunc
,HandleRequestFunc
)>;
116 SimulatedServer(ServerId _id
,
118 size_t _thread_pool_size
,
119 const ClientRespFunc
& _client_resp_f
,
120 const ServerAccumFunc
& _accum_f
,
121 CreateQueueF _create_queue_f
) :
123 priority_queue(_create_queue_f(std::bind(&SimulatedServer::has_avail_thread
,
125 std::bind(&SimulatedServer::inner_post
,
127 std::placeholders::_1
,
128 std::placeholders::_2
,
129 std::placeholders::_3
,
130 std::placeholders::_4
))),
131 client_resp_f(_client_resp_f
),
133 thread_pool_size(_thread_pool_size
),
138 std::chrono::microseconds((int) (0.5 +
139 thread_pool_size
* 1000000.0 / iops
));
140 std::chrono::milliseconds
finishing_check_period(1000);
141 threads
= new std::thread
[thread_pool_size
];
142 for (size_t i
= 0; i
< thread_pool_size
; ++i
) {
143 threads
[i
] = std::thread(&SimulatedServer::run
, this, finishing_check_period
);
147 virtual ~SimulatedServer() {
148 Lock
l(inner_queue_mtx
);
150 inner_queue_cv
.notify_all();
153 for (size_t i
= 0; i
< thread_pool_size
; ++i
) {
159 delete priority_queue
;
162 void post(TestRequest
&& request
,
163 const ClientId
& client_id
,
164 const ReqPm
& req_params
,
165 const Cost request_cost
)
167 time_stats(internal_stats
.mtx
,
168 internal_stats
.add_request_time
,
170 priority_queue
->add_request(std::move(request
),
175 count_stats(internal_stats
.mtx
,
176 internal_stats
.add_request_count
);
179 bool has_avail_thread() {
180 InnerQGuard
g(inner_queue_mtx
);
181 return inner_queue
.size() <= thread_pool_size
;
184 const Accum
& get_accumulator() const { return accumulator
; }
185 const Q
& get_priority_queue() const { return *priority_queue
; }
186 const InternalStats
& get_internal_stats() const { return internal_stats
; }
190 void inner_post(const ClientId
& client
,
191 std::unique_ptr
<TestRequest
> request
,
192 const RespPm
& additional
,
193 const Cost request_cost
) {
194 Lock
l(inner_queue_mtx
);
196 accum_f(accumulator
, additional
);
197 inner_queue
.emplace_back(QueueItem(client
,
201 inner_queue_cv
.notify_one();
204 void run(std::chrono::milliseconds check_period
) {
205 Lock
l(inner_queue_mtx
);
207 while(inner_queue
.empty() && !finishing
) {
208 inner_queue_cv
.wait_for(l
, check_period
);
210 if (!inner_queue
.empty()) {
211 auto& front
= inner_queue
.front();
212 auto client
= front
.client
;
213 auto req
= std::move(front
.request
);
214 auto additional
= front
.additional
;
215 auto request_cost
= front
.request_cost
;
216 inner_queue
.pop_front();
220 // simulation operation by sleeping; then call function to
221 // notify server of completion
222 std::this_thread::sleep_for(op_time
* request_cost
);
224 // TODO: rather than assuming this constructor exists, perhaps
225 // pass in a function that does this mapping?
226 client_resp_f(client
, TestResponse
{req
->epoch
}, id
, additional
, request_cost
);
228 time_stats(internal_stats
.mtx
,
229 internal_stats
.request_complete_time
,
231 priority_queue
->request_completed();
233 count_stats(internal_stats
.mtx
,
234 internal_stats
.request_complete_count
);
236 l
.lock(); // in prep for next iteration of loop
242 }; // class SimulatedServer
244 }; // namespace qos_simulation
245 }; // namespace crimson