]> git.proxmox.com Git - ceph.git/blob - ceph/src/dmclock/sim/src/sim_server.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / dmclock / sim / src / sim_server.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /*
5 * Copyright (C) 2016 Red Hat Inc.
6 *
7 * Author: J. Eric Ivancich <ivancich@redhat.com>
8 *
9 * This is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU Lesser General Public License version
11 * 2.1, as published by the Free Software Foundation. See file
12 * COPYING.
13 */
14
15
16 #pragma once
17
18
19 #include <thread>
20 #include <mutex>
21 #include <condition_variable>
22 #include <chrono>
23 #include <deque>
24
25 #include "sim_recs.h"
26
27
28 namespace crimson {
29 namespace qos_simulation {
30
31 template<typename Q, typename ReqPm, typename RespPm, typename Accum>
32 class SimulatedServer {
33
34 struct QueueItem {
35 ClientId client;
36 std::unique_ptr<TestRequest> request;
37 RespPm additional;
38 Cost request_cost;
39
40 QueueItem(const ClientId& _client,
41 std::unique_ptr<TestRequest>&& _request,
42 const RespPm& _additional,
43 const Cost _request_cost) :
44 client(_client),
45 request(std::move(_request)),
46 additional(_additional),
47 request_cost(_request_cost)
48 {
49 // empty
50 }
51 }; // QueueItem
52
53 public:
54
55 struct InternalStats {
56 std::mutex mtx;
57 std::chrono::nanoseconds add_request_time;
58 std::chrono::nanoseconds request_complete_time;
59 uint32_t add_request_count;
60 uint32_t request_complete_count;
61
62 InternalStats() :
63 add_request_time(0),
64 request_complete_time(0),
65 add_request_count(0),
66 request_complete_count(0)
67 {
68 // empty
69 }
70 };
71
72 using ClientRespFunc = std::function<void(ClientId,
73 const TestResponse&,
74 const ServerId&,
75 const RespPm&,
76 const Cost)>;
77
78 using ServerAccumFunc = std::function<void(Accum& accumulator,
79 const RespPm& additional)>;
80
81 protected:
82
83 const ServerId id;
84 Q* priority_queue;
85 ClientRespFunc client_resp_f;
86 int iops;
87 size_t thread_pool_size;
88
89 bool finishing;
90 std::chrono::microseconds op_time;
91
92 std::mutex inner_queue_mtx;
93 std::condition_variable inner_queue_cv;
94 std::deque<QueueItem> inner_queue;
95
96 std::thread* threads;
97
98 using InnerQGuard = std::lock_guard<decltype(inner_queue_mtx)>;
99 using Lock = std::unique_lock<std::mutex>;
100
101 // data collection
102
103 ServerAccumFunc accum_f;
104 Accum accumulator;
105
106 InternalStats internal_stats;
107
108 public:
109
110 using CanHandleRequestFunc = std::function<bool(void)>;
111 using HandleRequestFunc =
112 std::function<void(const ClientId&,std::unique_ptr<TestRequest>,const RespPm&, uint64_t)>;
113 using CreateQueueF = std::function<Q*(CanHandleRequestFunc,HandleRequestFunc)>;
114
115
116 SimulatedServer(ServerId _id,
117 int _iops,
118 size_t _thread_pool_size,
119 const ClientRespFunc& _client_resp_f,
120 const ServerAccumFunc& _accum_f,
121 CreateQueueF _create_queue_f) :
122 id(_id),
123 priority_queue(_create_queue_f(std::bind(&SimulatedServer::has_avail_thread,
124 this),
125 std::bind(&SimulatedServer::inner_post,
126 this,
127 std::placeholders::_1,
128 std::placeholders::_2,
129 std::placeholders::_3,
130 std::placeholders::_4))),
131 client_resp_f(_client_resp_f),
132 iops(_iops),
133 thread_pool_size(_thread_pool_size),
134 finishing(false),
135 accum_f(_accum_f)
136 {
137 op_time =
138 std::chrono::microseconds((int) (0.5 +
139 thread_pool_size * 1000000.0 / iops));
140 std::chrono::milliseconds finishing_check_period(1000);
141 threads = new std::thread[thread_pool_size];
142 for (size_t i = 0; i < thread_pool_size; ++i) {
143 threads[i] = std::thread(&SimulatedServer::run, this, finishing_check_period);
144 }
145 }
146
147 virtual ~SimulatedServer() {
148 Lock l(inner_queue_mtx);
149 finishing = true;
150 inner_queue_cv.notify_all();
151 l.unlock();
152
153 for (size_t i = 0; i < thread_pool_size; ++i) {
154 threads[i].join();
155 }
156
157 delete[] threads;
158
159 delete priority_queue;
160 }
161
162 void post(TestRequest&& request,
163 const ClientId& client_id,
164 const ReqPm& req_params,
165 const Cost request_cost)
166 {
167 time_stats(internal_stats.mtx,
168 internal_stats.add_request_time,
169 [&](){
170 priority_queue->add_request(std::move(request),
171 client_id,
172 req_params,
173 request_cost);
174 });
175 count_stats(internal_stats.mtx,
176 internal_stats.add_request_count);
177 }
178
179 bool has_avail_thread() {
180 InnerQGuard g(inner_queue_mtx);
181 return inner_queue.size() <= thread_pool_size;
182 }
183
184 const Accum& get_accumulator() const { return accumulator; }
185 const Q& get_priority_queue() const { return *priority_queue; }
186 const InternalStats& get_internal_stats() const { return internal_stats; }
187
188 protected:
189
190 void inner_post(const ClientId& client,
191 std::unique_ptr<TestRequest> request,
192 const RespPm& additional,
193 const Cost request_cost) {
194 Lock l(inner_queue_mtx);
195 assert(!finishing);
196 accum_f(accumulator, additional);
197 inner_queue.emplace_back(QueueItem(client,
198 std::move(request),
199 additional,
200 request_cost));
201 inner_queue_cv.notify_one();
202 }
203
204 void run(std::chrono::milliseconds check_period) {
205 Lock l(inner_queue_mtx);
206 while(true) {
207 while(inner_queue.empty() && !finishing) {
208 inner_queue_cv.wait_for(l, check_period);
209 }
210 if (!inner_queue.empty()) {
211 auto& front = inner_queue.front();
212 auto client = front.client;
213 auto req = std::move(front.request);
214 auto additional = front.additional;
215 auto request_cost = front.request_cost;
216 inner_queue.pop_front();
217
218 l.unlock();
219
220 // simulation operation by sleeping; then call function to
221 // notify server of completion
222 std::this_thread::sleep_for(op_time * request_cost);
223
224 // TODO: rather than assuming this constructor exists, perhaps
225 // pass in a function that does this mapping?
226 client_resp_f(client, TestResponse{req->epoch}, id, additional, request_cost);
227
228 time_stats(internal_stats.mtx,
229 internal_stats.request_complete_time,
230 [&](){
231 priority_queue->request_completed();
232 });
233 count_stats(internal_stats.mtx,
234 internal_stats.request_complete_count);
235
236 l.lock(); // in prep for next iteration of loop
237 } else {
238 break;
239 }
240 }
241 }
242 }; // class SimulatedServer
243
244 }; // namespace qos_simulation
245 }; // namespace crimson