1 #include "rgw_ratelimit.h"
2 #include "rgw_common.h"
6 #include <boost/asio.hpp>
7 #include <spawn/spawn.hpp>
8 #include <boost/asio/steady_timer.hpp>
11 #include <unordered_map>
13 #include <boost/program_options.hpp>
16 using Executor
= boost::asio::io_context::executor_type
;
17 std::uniform_int_distribution
<unsigned int> dist(0, 1);
18 std::random_device rd
;
19 std::default_random_engine rng
{rd()};
20 std::uniform_int_distribution
<unsigned long long> disttenant(2, 100000000);
22 uint64_t accepted
= 0;
23 uint64_t rejected
= 0;
26 uint64_t num_retries
= 0;
32 int64_t backend_bandwidth
= 1;
33 size_t wait_between_retries_ms
= 1;
36 std::shared_ptr
<std::vector
<client_info
>> ds
= std::make_shared
<std::vector
<client_info
>>(std::vector
<client_info
>());
38 std::string method
[2] = {"PUT", "GET"};
39 void simulate_transfer(client_info
& it
, const RGWRateLimitInfo
* info
, std::shared_ptr
<RateLimiter
> ratelimit
, const parameters
& params
, spawn::yield_context
& yield
, boost::asio::io_context
& ioctx
)
41 auto dout
= DoutPrefix(g_ceph_context
, ceph_subsys_rgw
, "rate limiter: ");
42 boost::asio::steady_timer
timer(ioctx
);
43 int rw
= 0; // will always use PUT method as there is no difference
44 std::string
methodop(method
[rw
]);
45 auto req_size
= params
.req_size
;
46 auto backend_bandwidth
= params
.backend_bandwidth
;
47 // the 4 * 1024 * 1024 is the RGW default we are sending in a typical environment
49 if (req_size
<= backend_bandwidth
) {
50 while (req_size
> 0) {
51 if(req_size
> 4*1024*1024) {
52 ratelimit
->decrease_bytes(methodop
.c_str(),it
.tenant
, 4*1024*1024, info
);
53 it
.bytes
+= 4*1024*1024;
54 req_size
= req_size
- 4*1024*1024;
57 ratelimit
->decrease_bytes(methodop
.c_str(),it
.tenant
, req_size
, info
);
62 int64_t total_bytes
= 0;
63 while (req_size
> 0) {
64 if (req_size
>= 4*1024*1024) {
65 if (total_bytes
>= backend_bandwidth
)
67 timer
.expires_after(std::chrono::seconds(1));
68 timer
.async_wait(yield
);
71 ratelimit
->decrease_bytes(methodop
.c_str(),it
.tenant
, 4*1024*1024, info
);
72 it
.bytes
+= 4*1024*1024;
73 req_size
= req_size
- 4*1024*1024;
74 total_bytes
+= 4*1024*1024;
77 ratelimit
->decrease_bytes(methodop
.c_str(),it
.tenant
, req_size
, info
);
79 total_bytes
+= req_size
;
86 bool simulate_request(client_info
& it
, const RGWRateLimitInfo
& info
, std::shared_ptr
<RateLimiter
> ratelimit
)
88 boost::asio::io_context context
;
89 auto time
= ceph::coarse_real_clock::now();
90 int rw
= 0; // will always use PUT method as there is no different
91 std::string methodop
= method
[rw
];
92 auto dout
= DoutPrefix(g_ceph_context
, ceph_subsys_rgw
, "rate limiter: ");
93 bool to_fail
= ratelimit
->should_rate_limit(methodop
.c_str(), it
.tenant
, time
, &info
);
103 void simulate_client(client_info
& it
, const RGWRateLimitInfo
& info
, std::shared_ptr
<RateLimiter
> ratelimit
, const parameters
& params
, spawn::yield_context
& ctx
, bool& to_run
, boost::asio::io_context
& ioctx
)
107 bool to_retry
= simulate_request(it
, info
, ratelimit
);
108 while (to_retry
&& to_run
)
110 if (params
.wait_between_retries_ms
)
112 boost::asio::steady_timer
timer(ioctx
);
113 timer
.expires_after(std::chrono::milliseconds(params
.wait_between_retries_ms
));
114 timer
.async_wait(ctx
);
116 to_retry
= simulate_request(it
, info
, ratelimit
);
122 simulate_transfer(it
, &info
, ratelimit
, params
, ctx
, ioctx
);
125 void simulate_clients(boost::asio::io_context
& context
, std::string tenant
, const RGWRateLimitInfo
& info
, std::shared_ptr
<RateLimiter
> ratelimit
, const parameters
& params
, bool& to_run
)
127 for (int i
= 0; i
< params
.num_clients
; i
++)
129 auto& it
= ds
->emplace_back(client_info());
131 int x
= ds
->size() - 1;
132 spawn::spawn(context
,
133 [&to_run
,x
, ratelimit
, info
, params
, &context
](spawn::yield_context ctx
)
135 auto& it
= ds
.get()->operator[](x
);
136 simulate_client(it
, info
, ratelimit
, params
, ctx
, to_run
, context
);
140 int main(int argc
, char **argv
)
142 int num_ratelimit_classes
= 1;
143 int64_t ops_limit
= 1;
144 int64_t bw_limit
= 1;
145 int thread_count
= 512;
150 using namespace boost::program_options
;
151 options_description desc
{"Options"};
153 ("help,h", "Help screen")
154 ("num_ratelimit_classes", value
<int>()->default_value(1), "how many ratelimit tenants")
155 ("request_size", value
<int64_t>()->default_value(1), "what is the request size we are testing if 0, it will be randomized")
156 ("backend_bandwidth", value
<int64_t>()->default_value(1), "what is the backend bandwidth, so there will be wait between decrease_bytes")
157 ("wait_between_retries_ms", value
<size_t>()->default_value(1), "time in seconds to wait between retries")
158 ("ops_limit", value
<int64_t>()->default_value(1), "ops limit for the tenants")
159 ("bw_limit", value
<int64_t>()->default_value(1), "bytes per second limit")
160 ("threads", value
<int>()->default_value(512), "server's threads count")
161 ("runtime", value
<int>()->default_value(60), "For how many seconds the test will run")
162 ("num_clients", value
<int>()->default_value(1), "number of clients per tenant to run");
164 store(parse_command_line(argc
, argv
, desc
), vm
);
165 if (vm
.count("help")) {
166 std::cout
<< desc
<< std::endl
;
169 num_ratelimit_classes
= vm
["num_ratelimit_classes"].as
<int>();
170 params
.req_size
= vm
["request_size"].as
<int64_t>();
171 params
.backend_bandwidth
= vm
["backend_bandwidth"].as
<int64_t>();
172 params
.wait_between_retries_ms
= vm
["wait_between_retries_ms"].as
<size_t>();
173 params
.num_clients
= vm
["num_clients"].as
<int>();
174 ops_limit
= vm
["ops_limit"].as
<int64_t>();
175 bw_limit
= vm
["bw_limit"].as
<int64_t>();
176 thread_count
= vm
["threads"].as
<int>();
177 runtime
= vm
["runtime"].as
<int>();
179 catch (const boost::program_options::error
&ex
)
181 std::cerr
<< ex
.what() << std::endl
;
184 RGWRateLimitInfo info
;
186 info
.max_read_bytes
= bw_limit
;
187 info
.max_write_bytes
= bw_limit
;
188 info
.max_read_ops
= ops_limit
;
189 info
.max_write_ops
= ops_limit
;
190 std::unique_ptr
<CephContext
> cct
= std::make_unique
<CephContext
>(CEPH_ENTITY_TYPE_ANY
);
193 g_ceph_context
= cct
.get();
195 std::shared_ptr
<ActiveRateLimiter
> ratelimit(new ActiveRateLimiter(g_ceph_context
));
197 std::vector
<std::thread
> threads
;
198 using Executor
= boost::asio::io_context::executor_type
;
199 std::optional
<boost::asio::executor_work_guard
<Executor
>> work
;
200 threads
.reserve(thread_count
);
201 boost::asio::io_context context
;
202 boost::asio::io_context stopme
;
203 work
.emplace(boost::asio::make_work_guard(context
));
205 for (int i
= 0; i
< thread_count
; i
++) {
206 threads
.emplace_back([&]() noexcept
{
212 ds
->reserve(num_ratelimit_classes
*params
.num_clients
);
213 for (int i
= 0; i
< num_ratelimit_classes
; i
++)
215 unsigned long long tenantid
= disttenant(rng
);
216 std::string tenantuser
= "uuser" + std::to_string(tenantid
);
217 simulate_clients(context
, tenantuser
, info
, ratelimit
->get_active(), params
, to_run
);
219 boost::asio::steady_timer
timer_runtime(stopme
);
220 timer_runtime
.expires_after(std::chrono::seconds(runtime
));
221 timer_runtime
.wait();
226 for (auto& i
: threads
)
230 std::unordered_map
<std::string
,client_info
> metrics_by_tenant
;
231 for(auto& i
: *ds
.get())
233 auto it
= metrics_by_tenant
.emplace(i
.tenant
, client_info()).first
;
234 std::cout
<< i
.accepted
<< std::endl
;
235 it
->second
.accepted
+= i
.accepted
;
236 it
->second
.rejected
+= i
.rejected
;
238 // TODO sum the results by tenant
239 for(auto& i
: metrics_by_tenant
)
241 std::cout
<< "Tenant is: " << i
.first
<< std::endl
;
242 std::cout
<< "Simulator finished accepted sum : " << i
.second
.accepted
<< std::endl
;
243 std::cout
<< "Simulator finished rejected sum : " << i
.second
.rejected
<< std::endl
;