]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | #include "rgw_ratelimit.h" |
2 | #include "rgw_common.h" | |
20effc67 TL |
3 | #include "random" |
4 | #include <cstdlib> | |
5 | #include <string> | |
6 | #include <boost/asio.hpp> | |
7 | #include <spawn/spawn.hpp> | |
8 | #include <boost/asio/steady_timer.hpp> | |
9 | #include <chrono> | |
10 | #include <mutex> | |
11 | #include <unordered_map> | |
12 | #include <atomic> | |
13 | #include <boost/program_options.hpp> | |
14 | ||
15 | ||
16 | using Executor = boost::asio::io_context::executor_type; | |
17 | std::uniform_int_distribution<unsigned int> dist(0, 1); | |
18 | std::random_device rd; | |
19 | std::default_random_engine rng{rd()}; | |
20 | std::uniform_int_distribution<unsigned long long> disttenant(2, 100000000); | |
21 | struct client_info { | |
22 | uint64_t accepted = 0; | |
23 | uint64_t rejected = 0; | |
24 | uint64_t ops = 0; | |
25 | uint64_t bytes = 0; | |
26 | uint64_t num_retries = 0; | |
27 | std::string tenant; | |
28 | }; | |
29 | ||
30 | struct parameters { | |
31 | int64_t req_size = 1; | |
32 | int64_t backend_bandwidth = 1; | |
33 | size_t wait_between_retries_ms = 1; | |
34 | int num_clients = 1; | |
35 | }; | |
36 | std::shared_ptr<std::vector<client_info>> ds = std::make_shared<std::vector<client_info>>(std::vector<client_info>()); | |
37 | ||
38 | std::string method[2] = {"PUT", "GET"}; | |
39 | void simulate_transfer(client_info& it, const RGWRateLimitInfo* info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, spawn::yield_context& yield, boost::asio::io_context& ioctx) | |
40 | { | |
41 | auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: "); | |
42 | boost::asio::steady_timer timer(ioctx); | |
43 | int rw = 0; // will always use PUT method as there is no difference | |
44 | std::string methodop(method[rw]); | |
45 | auto req_size = params.req_size; | |
46 | auto backend_bandwidth = params.backend_bandwidth; | |
47 | // the 4 * 1024 * 1024 is the RGW default we are sending in a typical environment | |
48 | while (req_size) { | |
49 | if (req_size <= backend_bandwidth) { | |
50 | while (req_size > 0) { | |
51 | if(req_size > 4*1024*1024) { | |
52 | ratelimit->decrease_bytes(methodop.c_str(),it.tenant, 4*1024*1024, info); | |
53 | it.bytes += 4*1024*1024; | |
54 | req_size = req_size - 4*1024*1024; | |
55 | } | |
56 | else { | |
57 | ratelimit->decrease_bytes(methodop.c_str(),it.tenant, req_size, info); | |
58 | req_size = 0; | |
59 | } | |
60 | } | |
61 | } else { | |
62 | int64_t total_bytes = 0; | |
63 | while (req_size > 0) { | |
64 | if (req_size >= 4*1024*1024) { | |
65 | if (total_bytes >= backend_bandwidth) | |
66 | { | |
67 | timer.expires_after(std::chrono::seconds(1)); | |
68 | timer.async_wait(yield); | |
69 | total_bytes = 0; | |
70 | } | |
71 | ratelimit->decrease_bytes(methodop.c_str(),it.tenant, 4*1024*1024, info); | |
72 | it.bytes += 4*1024*1024; | |
73 | req_size = req_size - 4*1024*1024; | |
74 | total_bytes += 4*1024*1024; | |
75 | } | |
76 | else { | |
77 | ratelimit->decrease_bytes(methodop.c_str(),it.tenant, req_size, info); | |
78 | it.bytes += req_size; | |
79 | total_bytes += req_size; | |
80 | req_size = 0; | |
81 | } | |
82 | } | |
83 | } | |
84 | } | |
85 | } | |
86 | bool simulate_request(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit) | |
87 | { | |
88 | boost::asio::io_context context; | |
89 | auto time = ceph::coarse_real_clock::now(); | |
90 | int rw = 0; // will always use PUT method as there is no different | |
91 | std::string methodop = method[rw]; | |
92 | auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: "); | |
93 | bool to_fail = ratelimit->should_rate_limit(methodop.c_str(), it.tenant, time, &info); | |
94 | if(to_fail) | |
95 | { | |
96 | it.rejected++; | |
97 | it.ops++; | |
98 | return true; | |
99 | } | |
100 | it.accepted++; | |
101 | return false; | |
102 | } | |
103 | void simulate_client(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, spawn::yield_context& ctx, bool& to_run, boost::asio::io_context& ioctx) | |
104 | { | |
105 | for (;;) | |
106 | { | |
107 | bool to_retry = simulate_request(it, info, ratelimit); | |
108 | while (to_retry && to_run) | |
109 | { | |
110 | if (params.wait_between_retries_ms) | |
111 | { | |
112 | boost::asio::steady_timer timer(ioctx); | |
113 | timer.expires_after(std::chrono::milliseconds(params.wait_between_retries_ms)); | |
114 | timer.async_wait(ctx); | |
115 | } | |
116 | to_retry = simulate_request(it, info, ratelimit); | |
117 | } | |
118 | if (!to_run) | |
119 | { | |
120 | return; | |
121 | } | |
122 | simulate_transfer(it, &info, ratelimit, params, ctx, ioctx); | |
123 | } | |
124 | } | |
125 | void simulate_clients(boost::asio::io_context& context, std::string tenant, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, bool& to_run) | |
126 | { | |
127 | for (int i = 0; i < params.num_clients; i++) | |
128 | { | |
129 | auto& it = ds->emplace_back(client_info()); | |
130 | it.tenant = tenant; | |
131 | int x = ds->size() - 1; | |
132 | spawn::spawn(context, | |
133 | [&to_run ,x, ratelimit, info, params, &context](spawn::yield_context ctx) | |
134 | { | |
135 | auto& it = ds.get()->operator[](x); | |
136 | simulate_client(it, info, ratelimit, params, ctx, to_run, context); | |
137 | }); | |
138 | } | |
139 | } | |
140 | int main(int argc, char **argv) | |
141 | { | |
142 | int num_ratelimit_classes = 1; | |
143 | int64_t ops_limit = 1; | |
144 | int64_t bw_limit = 1; | |
145 | int thread_count = 512; | |
146 | int runtime = 60; | |
147 | parameters params; | |
148 | try | |
149 | { | |
150 | using namespace boost::program_options; | |
151 | options_description desc{"Options"}; | |
152 | desc.add_options() | |
153 | ("help,h", "Help screen") | |
154 | ("num_ratelimit_classes", value<int>()->default_value(1), "how many ratelimit tenants") | |
155 | ("request_size", value<int64_t>()->default_value(1), "what is the request size we are testing if 0, it will be randomized") | |
156 | ("backend_bandwidth", value<int64_t>()->default_value(1), "what is the backend bandwidth, so there will be wait between decrease_bytes") | |
157 | ("wait_between_retries_ms", value<size_t>()->default_value(1), "time in seconds to wait between retries") | |
158 | ("ops_limit", value<int64_t>()->default_value(1), "ops limit for the tenants") | |
159 | ("bw_limit", value<int64_t>()->default_value(1), "bytes per second limit") | |
160 | ("threads", value<int>()->default_value(512), "server's threads count") | |
161 | ("runtime", value<int>()->default_value(60), "For how many seconds the test will run") | |
162 | ("num_clients", value<int>()->default_value(1), "number of clients per tenant to run"); | |
163 | variables_map vm; | |
164 | store(parse_command_line(argc, argv, desc), vm); | |
165 | if (vm.count("help")) { | |
166 | std::cout << desc << std::endl; | |
167 | return EXIT_SUCCESS; | |
168 | } | |
169 | num_ratelimit_classes = vm["num_ratelimit_classes"].as<int>(); | |
170 | params.req_size = vm["request_size"].as<int64_t>(); | |
171 | params.backend_bandwidth = vm["backend_bandwidth"].as<int64_t>(); | |
172 | params.wait_between_retries_ms = vm["wait_between_retries_ms"].as<size_t>(); | |
173 | params.num_clients = vm["num_clients"].as<int>(); | |
174 | ops_limit = vm["ops_limit"].as<int64_t>(); | |
175 | bw_limit = vm["bw_limit"].as<int64_t>(); | |
176 | thread_count = vm["threads"].as<int>(); | |
177 | runtime = vm["runtime"].as<int>(); | |
178 | } | |
179 | catch (const boost::program_options::error &ex) | |
180 | { | |
181 | std::cerr << ex.what() << std::endl; | |
182 | return EXIT_FAILURE; | |
183 | } | |
184 | RGWRateLimitInfo info; | |
185 | info.enabled = true; | |
186 | info.max_read_bytes = bw_limit; | |
187 | info.max_write_bytes = bw_limit; | |
188 | info.max_read_ops = ops_limit; | |
189 | info.max_write_ops = ops_limit; | |
190 | std::unique_ptr<CephContext> cct = std::make_unique<CephContext>(CEPH_ENTITY_TYPE_ANY); | |
191 | if (!g_ceph_context) | |
192 | { | |
193 | g_ceph_context = cct.get(); | |
194 | } | |
195 | std::shared_ptr<ActiveRateLimiter> ratelimit(new ActiveRateLimiter(g_ceph_context)); | |
196 | ratelimit->start(); | |
197 | std::vector<std::thread> threads; | |
198 | using Executor = boost::asio::io_context::executor_type; | |
199 | std::optional<boost::asio::executor_work_guard<Executor>> work; | |
200 | threads.reserve(thread_count); | |
201 | boost::asio::io_context context; | |
202 | boost::asio::io_context stopme; | |
203 | work.emplace(boost::asio::make_work_guard(context)); | |
204 | // server execution | |
205 | for (int i = 0; i < thread_count; i++) { | |
206 | threads.emplace_back([&]() noexcept { | |
207 | context.run(); | |
208 | }); | |
209 | } | |
210 | //client execution | |
211 | bool to_run = true; | |
212 | ds->reserve(num_ratelimit_classes*params.num_clients); | |
213 | for (int i = 0; i < num_ratelimit_classes; i++) | |
214 | { | |
215 | unsigned long long tenantid = disttenant(rng); | |
216 | std::string tenantuser = "uuser" + std::to_string(tenantid); | |
217 | simulate_clients(context, tenantuser, info, ratelimit->get_active(), params, to_run); | |
218 | } | |
219 | boost::asio::steady_timer timer_runtime(stopme); | |
220 | timer_runtime.expires_after(std::chrono::seconds(runtime)); | |
221 | timer_runtime.wait(); | |
222 | work.reset(); | |
223 | context.stop(); | |
224 | to_run = false; | |
225 | ||
226 | for (auto& i : threads) | |
227 | { | |
228 | i.join(); | |
229 | } | |
230 | std::unordered_map<std::string,client_info> metrics_by_tenant; | |
231 | for(auto& i : *ds.get()) | |
232 | { | |
233 | auto it = metrics_by_tenant.emplace(i.tenant, client_info()).first; | |
234 | std::cout << i.accepted << std::endl; | |
235 | it->second.accepted += i.accepted; | |
236 | it->second.rejected += i.rejected; | |
237 | } | |
238 | // TODO sum the results by tenant | |
239 | for(auto& i : metrics_by_tenant) | |
240 | { | |
241 | std::cout << "Tenant is: " << i.first << std::endl; | |
242 | std::cout << "Simulator finished accepted sum : " << i.second.accepted << std::endl; | |
243 | std::cout << "Simulator finished rejected sum : " << i.second.rejected << std::endl; | |
244 | } | |
245 | ||
246 | return 0; | |
247 | } |