]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/bench_rgw_ratelimit.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / test / rgw / bench_rgw_ratelimit.cc
1 #include "rgw_ratelimit.h"
2 #include "rgw_common.h"
3 #include "random"
4 #include <cstdlib>
5 #include <string>
6 #include <boost/asio.hpp>
7 #include <spawn/spawn.hpp>
8 #include <boost/asio/steady_timer.hpp>
9 #include <chrono>
10 #include <mutex>
11 #include <unordered_map>
12 #include <atomic>
13 #include <boost/program_options.hpp>
14
15
16 using Executor = boost::asio::io_context::executor_type;
17 std::uniform_int_distribution<unsigned int> dist(0, 1);
18 std::random_device rd;
19 std::default_random_engine rng{rd()};
20 std::uniform_int_distribution<unsigned long long> disttenant(2, 100000000);
21 struct client_info {
22 uint64_t accepted = 0;
23 uint64_t rejected = 0;
24 uint64_t ops = 0;
25 uint64_t bytes = 0;
26 uint64_t num_retries = 0;
27 std::string tenant;
28 };
29
30 struct parameters {
31 int64_t req_size = 1;
32 int64_t backend_bandwidth = 1;
33 size_t wait_between_retries_ms = 1;
34 int num_clients = 1;
35 };
36 std::shared_ptr<std::vector<client_info>> ds = std::make_shared<std::vector<client_info>>(std::vector<client_info>());
37
38 std::string method[2] = {"PUT", "GET"};
39 void simulate_transfer(client_info& it, const RGWRateLimitInfo* info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, spawn::yield_context& yield, boost::asio::io_context& ioctx)
40 {
41 auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: ");
42 boost::asio::steady_timer timer(ioctx);
43 int rw = 0; // will always use PUT method as there is no difference
44 std::string methodop(method[rw]);
45 auto req_size = params.req_size;
46 auto backend_bandwidth = params.backend_bandwidth;
47 // the 4 * 1024 * 1024 is the RGW default we are sending in a typical environment
48 while (req_size) {
49 if (req_size <= backend_bandwidth) {
50 while (req_size > 0) {
51 if(req_size > 4*1024*1024) {
52 ratelimit->decrease_bytes(methodop.c_str(),it.tenant, 4*1024*1024, info);
53 it.bytes += 4*1024*1024;
54 req_size = req_size - 4*1024*1024;
55 }
56 else {
57 ratelimit->decrease_bytes(methodop.c_str(),it.tenant, req_size, info);
58 req_size = 0;
59 }
60 }
61 } else {
62 int64_t total_bytes = 0;
63 while (req_size > 0) {
64 if (req_size >= 4*1024*1024) {
65 if (total_bytes >= backend_bandwidth)
66 {
67 timer.expires_after(std::chrono::seconds(1));
68 timer.async_wait(yield);
69 total_bytes = 0;
70 }
71 ratelimit->decrease_bytes(methodop.c_str(),it.tenant, 4*1024*1024, info);
72 it.bytes += 4*1024*1024;
73 req_size = req_size - 4*1024*1024;
74 total_bytes += 4*1024*1024;
75 }
76 else {
77 ratelimit->decrease_bytes(methodop.c_str(),it.tenant, req_size, info);
78 it.bytes += req_size;
79 total_bytes += req_size;
80 req_size = 0;
81 }
82 }
83 }
84 }
85 }
86 bool simulate_request(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit)
87 {
88 boost::asio::io_context context;
89 auto time = ceph::coarse_real_clock::now();
90 int rw = 0; // will always use PUT method as there is no different
91 std::string methodop = method[rw];
92 auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: ");
93 bool to_fail = ratelimit->should_rate_limit(methodop.c_str(), it.tenant, time, &info);
94 if(to_fail)
95 {
96 it.rejected++;
97 it.ops++;
98 return true;
99 }
100 it.accepted++;
101 return false;
102 }
103 void simulate_client(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, spawn::yield_context& ctx, bool& to_run, boost::asio::io_context& ioctx)
104 {
105 for (;;)
106 {
107 bool to_retry = simulate_request(it, info, ratelimit);
108 while (to_retry && to_run)
109 {
110 if (params.wait_between_retries_ms)
111 {
112 boost::asio::steady_timer timer(ioctx);
113 timer.expires_after(std::chrono::milliseconds(params.wait_between_retries_ms));
114 timer.async_wait(ctx);
115 }
116 to_retry = simulate_request(it, info, ratelimit);
117 }
118 if (!to_run)
119 {
120 return;
121 }
122 simulate_transfer(it, &info, ratelimit, params, ctx, ioctx);
123 }
124 }
125 void simulate_clients(boost::asio::io_context& context, std::string tenant, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, bool& to_run)
126 {
127 for (int i = 0; i < params.num_clients; i++)
128 {
129 auto& it = ds->emplace_back(client_info());
130 it.tenant = tenant;
131 int x = ds->size() - 1;
132 spawn::spawn(context,
133 [&to_run ,x, ratelimit, info, params, &context](spawn::yield_context ctx)
134 {
135 auto& it = ds.get()->operator[](x);
136 simulate_client(it, info, ratelimit, params, ctx, to_run, context);
137 });
138 }
139 }
140 int main(int argc, char **argv)
141 {
142 int num_ratelimit_classes = 1;
143 int64_t ops_limit = 1;
144 int64_t bw_limit = 1;
145 int thread_count = 512;
146 int runtime = 60;
147 parameters params;
148 try
149 {
150 using namespace boost::program_options;
151 options_description desc{"Options"};
152 desc.add_options()
153 ("help,h", "Help screen")
154 ("num_ratelimit_classes", value<int>()->default_value(1), "how many ratelimit tenants")
155 ("request_size", value<int64_t>()->default_value(1), "what is the request size we are testing if 0, it will be randomized")
156 ("backend_bandwidth", value<int64_t>()->default_value(1), "what is the backend bandwidth, so there will be wait between decrease_bytes")
157 ("wait_between_retries_ms", value<size_t>()->default_value(1), "time in seconds to wait between retries")
158 ("ops_limit", value<int64_t>()->default_value(1), "ops limit for the tenants")
159 ("bw_limit", value<int64_t>()->default_value(1), "bytes per second limit")
160 ("threads", value<int>()->default_value(512), "server's threads count")
161 ("runtime", value<int>()->default_value(60), "For how many seconds the test will run")
162 ("num_clients", value<int>()->default_value(1), "number of clients per tenant to run");
163 variables_map vm;
164 store(parse_command_line(argc, argv, desc), vm);
165 if (vm.count("help")) {
166 std::cout << desc << std::endl;
167 return EXIT_SUCCESS;
168 }
169 num_ratelimit_classes = vm["num_ratelimit_classes"].as<int>();
170 params.req_size = vm["request_size"].as<int64_t>();
171 params.backend_bandwidth = vm["backend_bandwidth"].as<int64_t>();
172 params.wait_between_retries_ms = vm["wait_between_retries_ms"].as<size_t>();
173 params.num_clients = vm["num_clients"].as<int>();
174 ops_limit = vm["ops_limit"].as<int64_t>();
175 bw_limit = vm["bw_limit"].as<int64_t>();
176 thread_count = vm["threads"].as<int>();
177 runtime = vm["runtime"].as<int>();
178 }
179 catch (const boost::program_options::error &ex)
180 {
181 std::cerr << ex.what() << std::endl;
182 return EXIT_FAILURE;
183 }
184 RGWRateLimitInfo info;
185 info.enabled = true;
186 info.max_read_bytes = bw_limit;
187 info.max_write_bytes = bw_limit;
188 info.max_read_ops = ops_limit;
189 info.max_write_ops = ops_limit;
190 std::unique_ptr<CephContext> cct = std::make_unique<CephContext>(CEPH_ENTITY_TYPE_ANY);
191 if (!g_ceph_context)
192 {
193 g_ceph_context = cct.get();
194 }
195 std::shared_ptr<ActiveRateLimiter> ratelimit(new ActiveRateLimiter(g_ceph_context));
196 ratelimit->start();
197 std::vector<std::thread> threads;
198 using Executor = boost::asio::io_context::executor_type;
199 std::optional<boost::asio::executor_work_guard<Executor>> work;
200 threads.reserve(thread_count);
201 boost::asio::io_context context;
202 boost::asio::io_context stopme;
203 work.emplace(boost::asio::make_work_guard(context));
204 // server execution
205 for (int i = 0; i < thread_count; i++) {
206 threads.emplace_back([&]() noexcept {
207 context.run();
208 });
209 }
210 //client execution
211 bool to_run = true;
212 ds->reserve(num_ratelimit_classes*params.num_clients);
213 for (int i = 0; i < num_ratelimit_classes; i++)
214 {
215 unsigned long long tenantid = disttenant(rng);
216 std::string tenantuser = "uuser" + std::to_string(tenantid);
217 simulate_clients(context, tenantuser, info, ratelimit->get_active(), params, to_run);
218 }
219 boost::asio::steady_timer timer_runtime(stopme);
220 timer_runtime.expires_after(std::chrono::seconds(runtime));
221 timer_runtime.wait();
222 work.reset();
223 context.stop();
224 to_run = false;
225
226 for (auto& i : threads)
227 {
228 i.join();
229 }
230 std::unordered_map<std::string,client_info> metrics_by_tenant;
231 for(auto& i : *ds.get())
232 {
233 auto it = metrics_by_tenant.emplace(i.tenant, client_info()).first;
234 std::cout << i.accepted << std::endl;
235 it->second.accepted += i.accepted;
236 it->second.rejected += i.rejected;
237 }
238 // TODO sum the results by tenant
239 for(auto& i : metrics_by_tenant)
240 {
241 std::cout << "Tenant is: " << i.first << std::endl;
242 std::cout << "Simulator finished accepted sum : " << i.second.accepted << std::endl;
243 std::cout << "Simulator finished rejected sum : " << i.second.rejected << std::endl;
244 }
245
246 return 0;
247 }