1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 * Copyright (C) 2016 Red Hat Inc.
14 #include <condition_variable>
22 namespace qos_simulation
{
24 template<typename Q
, typename ReqPm
, typename RespPm
, typename Accum
>
25 class SimulatedServer
{
29 std::unique_ptr
<TestRequest
> request
;
32 QueueItem(const ClientId
& _client
,
33 std::unique_ptr
<TestRequest
>&& _request
,
34 const RespPm
& _additional
) :
36 request(std::move(_request
)),
37 additional(_additional
)
45 struct InternalStats
{
47 std::chrono::nanoseconds add_request_time
;
48 std::chrono::nanoseconds request_complete_time
;
49 uint32_t add_request_count
;
50 uint32_t request_complete_count
;
54 request_complete_time(0),
56 request_complete_count(0)
62 using ClientRespFunc
= std::function
<void(ClientId
,
67 using ServerAccumFunc
= std::function
<void(Accum
& accumulator
,
68 const RespPm
& additional
)>;
74 ClientRespFunc client_resp_f
;
76 size_t thread_pool_size
;
79 std::chrono::microseconds op_time
;
81 std::mutex inner_queue_mtx
;
82 std::condition_variable inner_queue_cv
;
83 std::deque
<QueueItem
> inner_queue
;
87 using InnerQGuard
= std::lock_guard
<decltype(inner_queue_mtx
)>;
88 using Lock
= std::unique_lock
<std::mutex
>;
92 ServerAccumFunc accum_f
;
95 InternalStats internal_stats
;
99 using CanHandleRequestFunc
= std::function
<bool(void)>;
100 using HandleRequestFunc
=
101 std::function
<void(const ClientId
&,std::unique_ptr
<TestRequest
>,const RespPm
&)>;
102 using CreateQueueF
= std::function
<Q
*(CanHandleRequestFunc
,HandleRequestFunc
)>;
105 SimulatedServer(ServerId _id
,
107 size_t _thread_pool_size
,
108 const ClientRespFunc
& _client_resp_f
,
109 const ServerAccumFunc
& _accum_f
,
110 CreateQueueF _create_queue_f
) :
112 priority_queue(_create_queue_f(std::bind(&SimulatedServer::has_avail_thread
,
114 std::bind(&SimulatedServer::inner_post
,
116 std::placeholders::_1
,
117 std::placeholders::_2
,
118 std::placeholders::_3
))),
119 client_resp_f(_client_resp_f
),
121 thread_pool_size(_thread_pool_size
),
126 std::chrono::microseconds((int) (0.5 +
127 thread_pool_size
* 1000000.0 / iops
));
128 std::chrono::milliseconds
delay(1000);
129 threads
= new std::thread
[thread_pool_size
];
130 for (size_t i
= 0; i
< thread_pool_size
; ++i
) {
131 threads
[i
] = std::thread(&SimulatedServer::run
, this, delay
);
135 virtual ~SimulatedServer() {
136 Lock
l(inner_queue_mtx
);
138 inner_queue_cv
.notify_all();
141 for (size_t i
= 0; i
< thread_pool_size
; ++i
) {
147 delete priority_queue
;
150 void post(const TestRequest
& request
,
151 const ClientId
& client_id
,
152 const ReqPm
& req_params
)
154 time_stats(internal_stats
.mtx
,
155 internal_stats
.add_request_time
,
157 priority_queue
->add_request(request
, client_id
, req_params
);
159 count_stats(internal_stats
.mtx
,
160 internal_stats
.add_request_count
);
163 bool has_avail_thread() {
164 InnerQGuard
g(inner_queue_mtx
);
165 return inner_queue
.size() <= thread_pool_size
;
168 const Accum
& get_accumulator() const { return accumulator
; }
169 const Q
& get_priority_queue() const { return *priority_queue
; }
170 const InternalStats
& get_internal_stats() const { return internal_stats
; }
174 void inner_post(const ClientId
& client
,
175 std::unique_ptr
<TestRequest
> request
,
176 const RespPm
& additional
) {
177 Lock
l(inner_queue_mtx
);
179 accum_f(accumulator
, additional
);
180 inner_queue
.emplace_back(QueueItem(client
,
183 inner_queue_cv
.notify_one();
186 void run(std::chrono::milliseconds check_period
) {
187 Lock
l(inner_queue_mtx
);
189 while(inner_queue
.empty() && !finishing
) {
190 inner_queue_cv
.wait_for(l
, check_period
);
192 if (!inner_queue
.empty()) {
193 auto& front
= inner_queue
.front();
194 auto client
= front
.client
;
195 auto req
= std::move(front
.request
);
196 auto additional
= front
.additional
;
197 inner_queue
.pop_front();
201 // simulation operation by sleeping; then call function to
202 // notify server of completion
203 std::this_thread::sleep_for(op_time
);
205 TestResponse
resp(req
->epoch
);
206 // TODO: rather than assuming this constructor exists, perhaps
207 // pass in a function that does this mapping?
208 client_resp_f(client
, resp
, id
, additional
);
210 time_stats(internal_stats
.mtx
,
211 internal_stats
.request_complete_time
,
213 priority_queue
->request_completed();
215 count_stats(internal_stats
.mtx
,
216 internal_stats
.request_complete_count
);
218 l
.lock(); // in prep for next iteration of loop
224 }; // class SimulatedServer
226 }; // namespace qos_simulation
227 }; // namespace crimson