]> git.proxmox.com Git - ceph.git/blob - ceph/src/dmclock/sim/src/sim_server.h
e318d6e90ac328e6810aa4b7a1fc4f628003ee37
[ceph.git] / ceph / src / dmclock / sim / src / sim_server.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /*
5 * Copyright (C) 2016 Red Hat Inc.
6 */
7
8
9 #pragma once
10
11
12 #include <thread>
13 #include <mutex>
14 #include <condition_variable>
15 #include <chrono>
16 #include <deque>
17
18 #include "sim_recs.h"
19
20
21 namespace crimson {
22 namespace qos_simulation {
23
24 template<typename Q, typename ReqPm, typename RespPm, typename Accum>
25 class SimulatedServer {
26
27 struct QueueItem {
28 ClientId client;
29 std::unique_ptr<TestRequest> request;
30 RespPm additional;
31
32 QueueItem(const ClientId& _client,
33 std::unique_ptr<TestRequest>&& _request,
34 const RespPm& _additional) :
35 client(_client),
36 request(std::move(_request)),
37 additional(_additional)
38 {
39 // empty
40 }
41 }; // QueueItem
42
43 public:
44
45 struct InternalStats {
46 std::mutex mtx;
47 std::chrono::nanoseconds add_request_time;
48 std::chrono::nanoseconds request_complete_time;
49 uint32_t add_request_count;
50 uint32_t request_complete_count;
51
52 InternalStats() :
53 add_request_time(0),
54 request_complete_time(0),
55 add_request_count(0),
56 request_complete_count(0)
57 {
58 // empty
59 }
60 };
61
62 using ClientRespFunc = std::function<void(ClientId,
63 const TestResponse&,
64 const ServerId&,
65 const RespPm&)>;
66
67 using ServerAccumFunc = std::function<void(Accum& accumulator,
68 const RespPm& additional)>;
69
70 protected:
71
72 const ServerId id;
73 Q* priority_queue;
74 ClientRespFunc client_resp_f;
75 int iops;
76 size_t thread_pool_size;
77
78 bool finishing;
79 std::chrono::microseconds op_time;
80
81 std::mutex inner_queue_mtx;
82 std::condition_variable inner_queue_cv;
83 std::deque<QueueItem> inner_queue;
84
85 std::thread* threads;
86
87 using InnerQGuard = std::lock_guard<decltype(inner_queue_mtx)>;
88 using Lock = std::unique_lock<std::mutex>;
89
90 // data collection
91
92 ServerAccumFunc accum_f;
93 Accum accumulator;
94
95 InternalStats internal_stats;
96
97 public:
98
99 using CanHandleRequestFunc = std::function<bool(void)>;
100 using HandleRequestFunc =
101 std::function<void(const ClientId&,std::unique_ptr<TestRequest>,const RespPm&)>;
102 using CreateQueueF = std::function<Q*(CanHandleRequestFunc,HandleRequestFunc)>;
103
104
105 SimulatedServer(ServerId _id,
106 int _iops,
107 size_t _thread_pool_size,
108 const ClientRespFunc& _client_resp_f,
109 const ServerAccumFunc& _accum_f,
110 CreateQueueF _create_queue_f) :
111 id(_id),
112 priority_queue(_create_queue_f(std::bind(&SimulatedServer::has_avail_thread,
113 this),
114 std::bind(&SimulatedServer::inner_post,
115 this,
116 std::placeholders::_1,
117 std::placeholders::_2,
118 std::placeholders::_3))),
119 client_resp_f(_client_resp_f),
120 iops(_iops),
121 thread_pool_size(_thread_pool_size),
122 finishing(false),
123 accum_f(_accum_f)
124 {
125 op_time =
126 std::chrono::microseconds((int) (0.5 +
127 thread_pool_size * 1000000.0 / iops));
128 std::chrono::milliseconds delay(1000);
129 threads = new std::thread[thread_pool_size];
130 for (size_t i = 0; i < thread_pool_size; ++i) {
131 threads[i] = std::thread(&SimulatedServer::run, this, delay);
132 }
133 }
134
135 virtual ~SimulatedServer() {
136 Lock l(inner_queue_mtx);
137 finishing = true;
138 inner_queue_cv.notify_all();
139 l.unlock();
140
141 for (size_t i = 0; i < thread_pool_size; ++i) {
142 threads[i].join();
143 }
144
145 delete[] threads;
146
147 delete priority_queue;
148 }
149
150 void post(const TestRequest& request,
151 const ClientId& client_id,
152 const ReqPm& req_params)
153 {
154 time_stats(internal_stats.mtx,
155 internal_stats.add_request_time,
156 [&](){
157 priority_queue->add_request(request, client_id, req_params);
158 });
159 count_stats(internal_stats.mtx,
160 internal_stats.add_request_count);
161 }
162
163 bool has_avail_thread() {
164 InnerQGuard g(inner_queue_mtx);
165 return inner_queue.size() <= thread_pool_size;
166 }
167
168 const Accum& get_accumulator() const { return accumulator; }
169 const Q& get_priority_queue() const { return *priority_queue; }
170 const InternalStats& get_internal_stats() const { return internal_stats; }
171
172 protected:
173
174 void inner_post(const ClientId& client,
175 std::unique_ptr<TestRequest> request,
176 const RespPm& additional) {
177 Lock l(inner_queue_mtx);
178 assert(!finishing);
179 accum_f(accumulator, additional);
180 inner_queue.emplace_back(QueueItem(client,
181 std::move(request),
182 additional));
183 inner_queue_cv.notify_one();
184 }
185
186 void run(std::chrono::milliseconds check_period) {
187 Lock l(inner_queue_mtx);
188 while(true) {
189 while(inner_queue.empty() && !finishing) {
190 inner_queue_cv.wait_for(l, check_period);
191 }
192 if (!inner_queue.empty()) {
193 auto& front = inner_queue.front();
194 auto client = front.client;
195 auto req = std::move(front.request);
196 auto additional = front.additional;
197 inner_queue.pop_front();
198
199 l.unlock();
200
201 // simulation operation by sleeping; then call function to
202 // notify server of completion
203 std::this_thread::sleep_for(op_time);
204
205 TestResponse resp(req->epoch);
206 // TODO: rather than assuming this constructor exists, perhaps
207 // pass in a function that does this mapping?
208 client_resp_f(client, resp, id, additional);
209
210 time_stats(internal_stats.mtx,
211 internal_stats.request_complete_time,
212 [&](){
213 priority_queue->request_completed();
214 });
215 count_stats(internal_stats.mtx,
216 internal_stats.request_complete_count);
217
218 l.lock(); // in prep for next iteration of loop
219 } else {
220 break;
221 }
222 }
223 }
224 }; // class SimulatedServer
225
226 }; // namespace qos_simulation
227 }; // namespace crimson