1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2018 Red Hat, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 //#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
17 #include "rgw/rgw_dmclock_sync_scheduler.h"
18 #include "rgw/rgw_dmclock_async_scheduler.h"
21 #include <spawn/spawn.hpp>
22 #include <gtest/gtest.h>
24 #include "global/global_context.h"
26 namespace rgw::dmclock
{
28 using boost::system::error_code
;
30 // return a lambda that can be used as a callback to capture its arguments
31 auto capture(std::optional
<error_code
>& opt_ec
,
32 std::optional
<PhaseType
>& opt_phase
)
34 return [&] (error_code ec
, PhaseType phase
) {
40 TEST(Queue
, SyncRequest
)
42 ClientCounters
counters(g_ceph_context
);
43 auto client_info_f
= [] (client_id client
) -> ClientInfo
* {
44 static ClientInfo clients
[] = {
45 {1, 1, 1}, //admin: satisfy by reservation
46 {0, 1, 1}, //auth: satisfy by priority
48 return &clients
[static_cast<size_t>(client
)];
50 std::atomic
<bool> ready
= false;
51 auto server_ready_f
= [&ready
]() -> bool { return ready
.load();};
53 SyncScheduler
queue(g_ceph_context
, std::ref(counters
),
54 client_info_f
, server_ready_f
,
55 std::ref(SyncScheduler::handle_request_cb
)
59 auto now
= get_time();
61 queue
.add_request(client_id::admin
, {}, now
, 1);
62 queue
.add_request(client_id::auth
, {}, now
, 1);
64 // We can't see the queue at length 1 as the queue len is decremented as the
65 //request is processed
66 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
67 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_res
));
68 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_prio
));
69 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_limit
));
70 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_cancel
));
72 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
73 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_res
));
74 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_prio
));
75 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_limit
));
76 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_cancel
));
79 TEST(Queue
, RateLimit
)
81 boost::asio::io_context context
;
82 ClientCounters
counters(g_ceph_context
);
83 AsyncScheduler
queue(g_ceph_context
, context
, std::ref(counters
), nullptr,
84 [] (client_id client
) -> ClientInfo
* {
85 static ClientInfo clients
[] = {
89 return &clients
[static_cast<size_t>(client
)];
92 std::optional
<error_code
> ec1
, ec2
, ec3
, ec4
;
93 std::optional
<PhaseType
> p1
, p2
, p3
, p4
;
95 auto now
= get_time();
96 queue
.async_request(client_id::admin
, {}, now
, 1, capture(ec1
, p1
));
97 queue
.async_request(client_id::admin
, {}, now
, 1, capture(ec2
, p2
));
98 queue
.async_request(client_id::auth
, {}, now
, 1, capture(ec3
, p3
));
99 queue
.async_request(client_id::auth
, {}, now
, 1, capture(ec4
, p4
));
105 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
106 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
108 context
.run_for(std::chrono::milliseconds(1));
109 EXPECT_TRUE(context
.stopped());
112 EXPECT_EQ(boost::system::errc::success
, *ec1
);
114 EXPECT_EQ(PhaseType::reservation
, *p1
);
117 EXPECT_EQ(boost::system::errc::resource_unavailable_try_again
, *ec2
);
120 EXPECT_EQ(boost::system::errc::success
, *ec3
);
122 EXPECT_EQ(PhaseType::priority
, *p3
);
125 EXPECT_EQ(boost::system::errc::resource_unavailable_try_again
, *ec4
);
127 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
128 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_res
));
129 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_prio
));
130 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_limit
));
131 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_cancel
));
133 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
134 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_res
));
135 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_prio
));
136 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_limit
));
137 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_cancel
));
140 TEST(Queue
, AsyncRequest
)
142 boost::asio::io_context context
;
143 ClientCounters
counters(g_ceph_context
);
144 AsyncScheduler
queue(g_ceph_context
, context
, std::ref(counters
), nullptr,
145 [] (client_id client
) -> ClientInfo
* {
146 static ClientInfo clients
[] = {
147 {1, 1, 1}, // admin: satisfy by reservation
148 {0, 1, 1}, // auth: satisfy by priority
150 return &clients
[static_cast<size_t>(client
)];
154 std::optional
<error_code
> ec1
, ec2
;
155 std::optional
<PhaseType
> p1
, p2
;
157 auto now
= get_time();
158 queue
.async_request(client_id::admin
, {}, now
, 1, capture(ec1
, p1
));
159 queue
.async_request(client_id::auth
, {}, now
, 1, capture(ec2
, p2
));
163 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
164 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
166 context
.run_for(std::chrono::milliseconds(1));
167 EXPECT_TRUE(context
.stopped());
170 EXPECT_EQ(boost::system::errc::success
, *ec1
);
172 EXPECT_EQ(PhaseType::reservation
, *p1
);
175 EXPECT_EQ(boost::system::errc::success
, *ec2
);
177 EXPECT_EQ(PhaseType::priority
, *p2
);
179 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
180 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_res
));
181 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_prio
));
182 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_limit
));
183 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_cancel
));
185 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
186 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_res
));
187 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_prio
));
188 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_limit
));
189 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_cancel
));
195 boost::asio::io_context context
;
196 ClientCounters
counters(g_ceph_context
);
197 AsyncScheduler
queue(g_ceph_context
, context
, std::ref(counters
), nullptr,
198 [] (client_id client
) -> ClientInfo
* {
199 static ClientInfo info
{0, 1, 1};
203 std::optional
<error_code
> ec1
, ec2
;
204 std::optional
<PhaseType
> p1
, p2
;
206 auto now
= get_time();
207 queue
.async_request(client_id::admin
, {}, now
, 1, capture(ec1
, p1
));
208 queue
.async_request(client_id::auth
, {}, now
, 1, capture(ec2
, p2
));
212 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
213 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
220 context
.run_for(std::chrono::milliseconds(1));
221 EXPECT_TRUE(context
.stopped());
224 EXPECT_EQ(boost::asio::error::operation_aborted
, *ec1
);
226 EXPECT_EQ(boost::asio::error::operation_aborted
, *ec2
);
228 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
229 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_res
));
230 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_prio
));
231 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_limit
));
232 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_cancel
));
234 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
235 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_res
));
236 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_prio
));
237 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_limit
));
238 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_cancel
));
241 TEST(Queue
, CancelClient
)
243 boost::asio::io_context context
;
244 ClientCounters
counters(g_ceph_context
);
245 AsyncScheduler
queue(g_ceph_context
, context
, std::ref(counters
), nullptr,
246 [] (client_id client
) -> ClientInfo
* {
247 static ClientInfo info
{0, 1, 1};
251 std::optional
<error_code
> ec1
, ec2
;
252 std::optional
<PhaseType
> p1
, p2
;
254 auto now
= get_time();
255 queue
.async_request(client_id::admin
, {}, now
, 1, capture(ec1
, p1
));
256 queue
.async_request(client_id::auth
, {}, now
, 1, capture(ec2
, p2
));
260 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
261 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
263 queue
.cancel(client_id::admin
);
268 context
.run_for(std::chrono::milliseconds(1));
269 EXPECT_TRUE(context
.stopped());
272 EXPECT_EQ(boost::asio::error::operation_aborted
, *ec1
);
275 EXPECT_EQ(boost::system::errc::success
, *ec2
);
277 EXPECT_EQ(PhaseType::priority
, *p2
);
279 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
280 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_res
));
281 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_prio
));
282 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_limit
));
283 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_cancel
));
285 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
286 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_res
));
287 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_prio
));
288 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_limit
));
289 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_cancel
));
292 TEST(Queue
, CancelOnDestructor
)
294 boost::asio::io_context context
;
296 std::optional
<error_code
> ec1
, ec2
;
297 std::optional
<PhaseType
> p1
, p2
;
299 ClientCounters
counters(g_ceph_context
);
301 AsyncScheduler
queue(g_ceph_context
, context
, std::ref(counters
), nullptr,
302 [] (client_id client
) -> ClientInfo
* {
303 static ClientInfo info
{0, 1, 1};
307 auto now
= get_time();
308 queue
.async_request(client_id::admin
, {}, now
, 1, capture(ec1
, p1
));
309 queue
.async_request(client_id::auth
, {}, now
, 1, capture(ec2
, p2
));
311 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
312 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
318 context
.run_for(std::chrono::milliseconds(1));
319 EXPECT_TRUE(context
.stopped());
322 EXPECT_EQ(boost::asio::error::operation_aborted
, *ec1
);
324 EXPECT_EQ(boost::asio::error::operation_aborted
, *ec2
);
326 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
327 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_res
));
328 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_prio
));
329 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_limit
));
330 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_cancel
));
332 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
333 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_res
));
334 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_prio
));
335 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_limit
));
336 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_cancel
));
339 // return a lambda from capture() that's bound to run on the given executor
340 template <typename Executor
>
341 auto capture(const Executor
& ex
, std::optional
<error_code
>& opt_ec
,
342 std::optional
<PhaseType
>& opt_res
)
344 return boost::asio::bind_executor(ex
, capture(opt_ec
, opt_res
));
347 TEST(Queue
, CrossExecutorRequest
)
349 boost::asio::io_context queue_context
;
350 ClientCounters
counters(g_ceph_context
);
351 AsyncScheduler
queue(g_ceph_context
, queue_context
, std::ref(counters
), nullptr,
352 [] (client_id client
) -> ClientInfo
* {
353 static ClientInfo info
{0, 1, 1};
357 // create a separate execution context to use for all callbacks to test that
358 // pending requests maintain executor work guards on both executors
359 boost::asio::io_context callback_context
;
360 auto ex2
= callback_context
.get_executor();
362 std::optional
<error_code
> ec1
, ec2
;
363 std::optional
<PhaseType
> p1
, p2
;
365 auto now
= get_time();
366 queue
.async_request(client_id::admin
, {}, now
, 1, capture(ex2
, ec1
, p1
));
367 queue
.async_request(client_id::auth
, {}, now
, 1, capture(ex2
, ec2
, p2
));
369 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
370 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
372 callback_context
.run_for(std::chrono::milliseconds(1));
373 // maintains work on callback executor while in queue
374 EXPECT_FALSE(callback_context
.stopped());
379 queue_context
.run_for(std::chrono::milliseconds(1));
380 EXPECT_TRUE(queue_context
.stopped());
382 EXPECT_FALSE(ec1
); // no callbacks until callback executor runs
385 callback_context
.run_for(std::chrono::milliseconds(1));
386 EXPECT_TRUE(callback_context
.stopped());
389 EXPECT_EQ(boost::system::errc::success
, *ec1
);
391 EXPECT_EQ(PhaseType::priority
, *p1
);
394 EXPECT_EQ(boost::system::errc::success
, *ec2
);
396 EXPECT_EQ(PhaseType::priority
, *p2
);
399 TEST(Queue
, SpawnAsyncRequest
)
401 boost::asio::io_context context
;
403 spawn::spawn(context
, [&] (yield_context yield
) {
404 ClientCounters
counters(g_ceph_context
);
405 AsyncScheduler
queue(g_ceph_context
, context
, std::ref(counters
), nullptr,
406 [] (client_id client
) -> ClientInfo
* {
407 static ClientInfo clients
[] = {
408 {1, 1, 1}, // admin: satisfy by reservation
409 {0, 1, 1}, // auth: satisfy by priority
411 return &clients
[static_cast<size_t>(client
)];
415 auto p1
= queue
.async_request(client_id::admin
, {}, get_time(), 1, yield
[ec1
]);
416 EXPECT_EQ(boost::system::errc::success
, ec1
);
417 EXPECT_EQ(PhaseType::reservation
, p1
);
419 auto p2
= queue
.async_request(client_id::auth
, {}, get_time(), 1, yield
[ec2
]);
420 EXPECT_EQ(boost::system::errc::success
, ec2
);
421 EXPECT_EQ(PhaseType::priority
, p2
);
424 context
.run_for(std::chrono::milliseconds(1));
425 EXPECT_TRUE(context
.stopped());
428 } // namespace rgw::dmclock