1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2018 Red Hat, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 //#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
17 #include "rgw/rgw_dmclock_sync_scheduler.h"
18 #include "rgw/rgw_dmclock_async_scheduler.h"
21 #ifdef HAVE_BOOST_CONTEXT
22 #include <boost/asio/spawn.hpp>
24 #include <gtest/gtest.h>
26 #include "global/global_context.h"
28 namespace rgw::dmclock
{
30 using boost::system::error_code
;
32 // return a lambda that can be used as a callback to capture its arguments
33 auto capture(std::optional
<error_code
>& opt_ec
,
34 std::optional
<PhaseType
>& opt_phase
)
36 return [&] (error_code ec
, PhaseType phase
) {
42 TEST(Queue
, SyncRequest
)
44 ClientCounters
counters(g_ceph_context
);
45 auto client_info_f
= [] (client_id client
) -> ClientInfo
* {
46 static ClientInfo clients
[] = {
47 {1, 1, 1}, //admin: satisfy by reservation
48 {0, 1, 1}, //auth: satisfy by priority
50 return &clients
[static_cast<size_t>(client
)];
52 std::atomic
<bool> ready
= false;
53 auto server_ready_f
= [&ready
]() -> bool { return ready
.load();};
55 SyncScheduler
queue(g_ceph_context
, std::ref(counters
),
56 client_info_f
, server_ready_f
,
57 std::ref(SyncScheduler::handle_request_cb
)
61 auto now
= get_time();
63 queue
.add_request(client_id::admin
, {}, now
, 1);
64 queue
.add_request(client_id::auth
, {}, now
, 1);
66 // We can't see the queue at length 1 as the queue len is decremented as the
67 //request is processed
68 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
69 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_res
));
70 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_prio
));
71 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_limit
));
72 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_cancel
));
74 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
75 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_res
));
76 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_prio
));
77 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_limit
));
78 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_cancel
));
81 #ifdef HAVE_BOOST_CONTEXT
82 TEST(Queue
, RateLimit
)
84 boost::asio::io_context context
;
85 ClientCounters
counters(g_ceph_context
);
86 AsyncScheduler
queue(g_ceph_context
, context
, std::ref(counters
), nullptr,
87 [] (client_id client
) -> ClientInfo
* {
88 static ClientInfo clients
[] = {
92 return &clients
[static_cast<size_t>(client
)];
95 std::optional
<error_code
> ec1
, ec2
, ec3
, ec4
;
96 std::optional
<PhaseType
> p1
, p2
, p3
, p4
;
98 auto now
= get_time();
99 queue
.async_request(client_id::admin
, {}, now
, 1, capture(ec1
, p1
));
100 queue
.async_request(client_id::admin
, {}, now
, 1, capture(ec2
, p2
));
101 queue
.async_request(client_id::auth
, {}, now
, 1, capture(ec3
, p3
));
102 queue
.async_request(client_id::auth
, {}, now
, 1, capture(ec4
, p4
));
108 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
109 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
112 EXPECT_TRUE(context
.stopped());
115 EXPECT_EQ(boost::system::errc::success
, *ec1
);
117 EXPECT_EQ(PhaseType::reservation
, *p1
);
120 EXPECT_EQ(boost::system::errc::resource_unavailable_try_again
, *ec2
);
123 EXPECT_EQ(boost::system::errc::success
, *ec3
);
125 EXPECT_EQ(PhaseType::priority
, *p3
);
128 EXPECT_EQ(boost::system::errc::resource_unavailable_try_again
, *ec4
);
130 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
131 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_res
));
132 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_prio
));
133 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_limit
));
134 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_cancel
));
136 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
137 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_res
));
138 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_prio
));
139 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_limit
));
140 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_cancel
));
143 TEST(Queue
, AsyncRequest
)
145 boost::asio::io_context context
;
146 ClientCounters
counters(g_ceph_context
);
147 AsyncScheduler
queue(g_ceph_context
, context
, std::ref(counters
), nullptr,
148 [] (client_id client
) -> ClientInfo
* {
149 static ClientInfo clients
[] = {
150 {1, 1, 1}, // admin: satisfy by reservation
151 {0, 1, 1}, // auth: satisfy by priority
153 return &clients
[static_cast<size_t>(client
)];
157 std::optional
<error_code
> ec1
, ec2
;
158 std::optional
<PhaseType
> p1
, p2
;
160 auto now
= get_time();
161 queue
.async_request(client_id::admin
, {}, now
, 1, capture(ec1
, p1
));
162 queue
.async_request(client_id::auth
, {}, now
, 1, capture(ec2
, p2
));
166 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
167 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
170 EXPECT_TRUE(context
.stopped());
173 EXPECT_EQ(boost::system::errc::success
, *ec1
);
175 EXPECT_EQ(PhaseType::reservation
, *p1
);
178 EXPECT_EQ(boost::system::errc::success
, *ec2
);
180 EXPECT_EQ(PhaseType::priority
, *p2
);
182 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
183 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_res
));
184 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_prio
));
185 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_limit
));
186 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_cancel
));
188 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
189 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_res
));
190 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_prio
));
191 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_limit
));
192 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_cancel
));
198 boost::asio::io_context context
;
199 ClientCounters
counters(g_ceph_context
);
200 AsyncScheduler
queue(g_ceph_context
, context
, std::ref(counters
), nullptr,
201 [] (client_id client
) -> ClientInfo
* {
202 static ClientInfo info
{0, 1, 1};
206 std::optional
<error_code
> ec1
, ec2
;
207 std::optional
<PhaseType
> p1
, p2
;
209 auto now
= get_time();
210 queue
.async_request(client_id::admin
, {}, now
, 1, capture(ec1
, p1
));
211 queue
.async_request(client_id::auth
, {}, now
, 1, capture(ec2
, p2
));
215 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
216 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
224 EXPECT_TRUE(context
.stopped());
227 EXPECT_EQ(boost::asio::error::operation_aborted
, *ec1
);
229 EXPECT_EQ(boost::asio::error::operation_aborted
, *ec2
);
231 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
232 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_res
));
233 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_prio
));
234 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_limit
));
235 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_cancel
));
237 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
238 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_res
));
239 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_prio
));
240 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_limit
));
241 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_cancel
));
244 TEST(Queue
, CancelClient
)
246 boost::asio::io_context context
;
247 ClientCounters
counters(g_ceph_context
);
248 AsyncScheduler
queue(g_ceph_context
, context
, std::ref(counters
), nullptr,
249 [] (client_id client
) -> ClientInfo
* {
250 static ClientInfo info
{0, 1, 1};
254 std::optional
<error_code
> ec1
, ec2
;
255 std::optional
<PhaseType
> p1
, p2
;
257 auto now
= get_time();
258 queue
.async_request(client_id::admin
, {}, now
, 1, capture(ec1
, p1
));
259 queue
.async_request(client_id::auth
, {}, now
, 1, capture(ec2
, p2
));
263 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
264 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
266 queue
.cancel(client_id::admin
);
272 EXPECT_TRUE(context
.stopped());
275 EXPECT_EQ(boost::asio::error::operation_aborted
, *ec1
);
278 EXPECT_EQ(boost::system::errc::success
, *ec2
);
280 EXPECT_EQ(PhaseType::priority
, *p2
);
282 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
283 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_res
));
284 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_prio
));
285 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_limit
));
286 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_cancel
));
288 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
289 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_res
));
290 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_prio
));
291 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_limit
));
292 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_cancel
));
295 TEST(Queue
, CancelOnDestructor
)
297 boost::asio::io_context context
;
299 std::optional
<error_code
> ec1
, ec2
;
300 std::optional
<PhaseType
> p1
, p2
;
302 ClientCounters
counters(g_ceph_context
);
304 AsyncScheduler
queue(g_ceph_context
, context
, std::ref(counters
), nullptr,
305 [] (client_id client
) -> ClientInfo
* {
306 static ClientInfo info
{0, 1, 1};
310 auto now
= get_time();
311 queue
.async_request(client_id::admin
, {}, now
, 1, capture(ec1
, p1
));
312 queue
.async_request(client_id::auth
, {}, now
, 1, capture(ec2
, p2
));
314 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
315 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
322 EXPECT_TRUE(context
.stopped());
325 EXPECT_EQ(boost::asio::error::operation_aborted
, *ec1
);
327 EXPECT_EQ(boost::asio::error::operation_aborted
, *ec2
);
329 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
330 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_res
));
331 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_prio
));
332 EXPECT_EQ(0u, counters(client_id::admin
)->get(queue_counters::l_limit
));
333 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_cancel
));
335 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
336 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_res
));
337 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_prio
));
338 EXPECT_EQ(0u, counters(client_id::auth
)->get(queue_counters::l_limit
));
339 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_cancel
));
342 // return a lambda from capture() that's bound to run on the given executor
343 template <typename Executor
>
344 auto capture(const Executor
& ex
, std::optional
<error_code
>& opt_ec
,
345 std::optional
<PhaseType
>& opt_res
)
347 return boost::asio::bind_executor(ex
, capture(opt_ec
, opt_res
));
350 TEST(Queue
, CrossExecutorRequest
)
352 boost::asio::io_context queue_context
;
353 ClientCounters
counters(g_ceph_context
);
354 AsyncScheduler
queue(g_ceph_context
, queue_context
, std::ref(counters
), nullptr,
355 [] (client_id client
) -> ClientInfo
* {
356 static ClientInfo info
{0, 1, 1};
360 // create a separate execution context to use for all callbacks to test that
361 // pending requests maintain executor work guards on both executors
362 boost::asio::io_context callback_context
;
363 auto ex2
= callback_context
.get_executor();
365 std::optional
<error_code
> ec1
, ec2
;
366 std::optional
<PhaseType
> p1
, p2
;
368 auto now
= get_time();
369 queue
.async_request(client_id::admin
, {}, now
, 1, capture(ex2
, ec1
, p1
));
370 queue
.async_request(client_id::auth
, {}, now
, 1, capture(ex2
, ec2
, p2
));
372 EXPECT_EQ(1u, counters(client_id::admin
)->get(queue_counters::l_qlen
));
373 EXPECT_EQ(1u, counters(client_id::auth
)->get(queue_counters::l_qlen
));
375 callback_context
.poll();
376 // maintains work on callback executor while in queue
377 EXPECT_FALSE(callback_context
.stopped());
382 queue_context
.poll();
383 EXPECT_TRUE(queue_context
.stopped());
385 EXPECT_FALSE(ec1
); // no callbacks until callback executor runs
388 callback_context
.poll();
389 EXPECT_TRUE(callback_context
.stopped());
392 EXPECT_EQ(boost::system::errc::success
, *ec1
);
394 EXPECT_EQ(PhaseType::priority
, *p1
);
397 EXPECT_EQ(boost::system::errc::success
, *ec2
);
399 EXPECT_EQ(PhaseType::priority
, *p2
);
402 TEST(Queue
, SpawnAsyncRequest
)
404 boost::asio::io_context context
;
406 boost::asio::spawn(context
, [&] (boost::asio::yield_context yield
) {
407 ClientCounters
counters(g_ceph_context
);
408 AsyncScheduler
queue(g_ceph_context
, context
, std::ref(counters
), nullptr,
409 [] (client_id client
) -> ClientInfo
* {
410 static ClientInfo clients
[] = {
411 {1, 1, 1}, // admin: satisfy by reservation
412 {0, 1, 1}, // auth: satisfy by priority
414 return &clients
[static_cast<size_t>(client
)];
418 auto p1
= queue
.async_request(client_id::admin
, {}, get_time(), 1, yield
[ec1
]);
419 EXPECT_EQ(boost::system::errc::success
, ec1
);
420 EXPECT_EQ(PhaseType::reservation
, p1
);
422 auto p2
= queue
.async_request(client_id::auth
, {}, get_time(), 1, yield
[ec2
]);
423 EXPECT_EQ(boost::system::errc::success
, ec2
);
424 EXPECT_EQ(PhaseType::priority
, p2
);
428 EXPECT_TRUE(context
.stopped());
433 } // namespace rgw::dmclock