]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/rgw/test_rgw_dmclock_scheduler.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / rgw / test_rgw_dmclock_scheduler.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2018 Red Hat, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
16
17#include "rgw/rgw_dmclock_sync_scheduler.h"
18#include "rgw/rgw_dmclock_async_scheduler.h"
19
20#include <optional>
522d829b 21#include <spawn/spawn.hpp>
11fdf7f2
TL
22#include <gtest/gtest.h>
23#include "acconfig.h"
24#include "global/global_context.h"
25
26namespace rgw::dmclock {
27
28using boost::system::error_code;
29
30// return a lambda that can be used as a callback to capture its arguments
31auto capture(std::optional<error_code>& opt_ec,
32 std::optional<PhaseType>& opt_phase)
33{
34 return [&] (error_code ec, PhaseType phase) {
35 opt_ec = ec;
36 opt_phase = phase;
37 };
38}
39
40TEST(Queue, SyncRequest)
41{
42 ClientCounters counters(g_ceph_context);
43 auto client_info_f = [] (client_id client) -> ClientInfo* {
44 static ClientInfo clients[] = {
45 {1, 1, 1}, //admin: satisfy by reservation
46 {0, 1, 1}, //auth: satisfy by priority
47 };
48 return &clients[static_cast<size_t>(client)];
49 };
50 std::atomic <bool> ready = false;
51 auto server_ready_f = [&ready]() -> bool { return ready.load();};
52
53 SyncScheduler queue(g_ceph_context, std::ref(counters),
54 client_info_f, server_ready_f,
55 std::ref(SyncScheduler::handle_request_cb)
56 );
57
58
59 auto now = get_time();
60 ready = true;
61 queue.add_request(client_id::admin, {}, now, 1);
62 queue.add_request(client_id::auth, {}, now, 1);
63
64 // We can't see the queue at length 1 as the queue len is decremented as the
65 //request is processed
66 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
67 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_res));
68 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
69 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_limit));
70 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_cancel));
71
72 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
73 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
74 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_prio));
75 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_limit));
76 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_cancel));
77}
78
11fdf7f2
TL
79TEST(Queue, RateLimit)
80{
81 boost::asio::io_context context;
82 ClientCounters counters(g_ceph_context);
83 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
84 [] (client_id client) -> ClientInfo* {
85 static ClientInfo clients[] = {
86 {1, 1, 1}, // admin
87 {0, 1, 1}, // auth
88 };
89 return &clients[static_cast<size_t>(client)];
90 }, AtLimit::Reject);
91
92 std::optional<error_code> ec1, ec2, ec3, ec4;
93 std::optional<PhaseType> p1, p2, p3, p4;
94
95 auto now = get_time();
96 queue.async_request(client_id::admin, {}, now, 1, capture(ec1, p1));
97 queue.async_request(client_id::admin, {}, now, 1, capture(ec2, p2));
98 queue.async_request(client_id::auth, {}, now, 1, capture(ec3, p3));
99 queue.async_request(client_id::auth, {}, now, 1, capture(ec4, p4));
100 EXPECT_FALSE(ec1);
101 EXPECT_FALSE(ec2);
102 EXPECT_FALSE(ec3);
103 EXPECT_FALSE(ec4);
104
105 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
106 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
107
522d829b 108 context.run_for(std::chrono::milliseconds(1));
11fdf7f2
TL
109 EXPECT_TRUE(context.stopped());
110
111 ASSERT_TRUE(ec1);
112 EXPECT_EQ(boost::system::errc::success, *ec1);
113 ASSERT_TRUE(p1);
114 EXPECT_EQ(PhaseType::reservation, *p1);
115
116 ASSERT_TRUE(ec2);
117 EXPECT_EQ(boost::system::errc::resource_unavailable_try_again, *ec2);
118
119 ASSERT_TRUE(ec3);
120 EXPECT_EQ(boost::system::errc::success, *ec3);
121 ASSERT_TRUE(p3);
122 EXPECT_EQ(PhaseType::priority, *p3);
123
124 ASSERT_TRUE(ec4);
125 EXPECT_EQ(boost::system::errc::resource_unavailable_try_again, *ec4);
126
127 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
128 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_res));
129 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
130 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_limit));
131 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_cancel));
132
133 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
134 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
135 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_prio));
136 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_limit));
137 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_cancel));
138}
139
140TEST(Queue, AsyncRequest)
141{
142 boost::asio::io_context context;
143 ClientCounters counters(g_ceph_context);
144 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
145 [] (client_id client) -> ClientInfo* {
146 static ClientInfo clients[] = {
147 {1, 1, 1}, // admin: satisfy by reservation
148 {0, 1, 1}, // auth: satisfy by priority
149 };
150 return &clients[static_cast<size_t>(client)];
151 }, AtLimit::Reject
152 );
153
154 std::optional<error_code> ec1, ec2;
155 std::optional<PhaseType> p1, p2;
156
157 auto now = get_time();
158 queue.async_request(client_id::admin, {}, now, 1, capture(ec1, p1));
159 queue.async_request(client_id::auth, {}, now, 1, capture(ec2, p2));
160 EXPECT_FALSE(ec1);
161 EXPECT_FALSE(ec2);
162
163 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
164 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
165
522d829b 166 context.run_for(std::chrono::milliseconds(1));
11fdf7f2
TL
167 EXPECT_TRUE(context.stopped());
168
169 ASSERT_TRUE(ec1);
170 EXPECT_EQ(boost::system::errc::success, *ec1);
171 ASSERT_TRUE(p1);
172 EXPECT_EQ(PhaseType::reservation, *p1);
173
174 ASSERT_TRUE(ec2);
175 EXPECT_EQ(boost::system::errc::success, *ec2);
176 ASSERT_TRUE(p2);
177 EXPECT_EQ(PhaseType::priority, *p2);
178
179 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
180 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_res));
181 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
182 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_limit));
183 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_cancel));
184
185 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
186 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
187 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_prio));
188 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_limit));
189 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_cancel));
190}
191
192
193TEST(Queue, Cancel)
194{
195 boost::asio::io_context context;
196 ClientCounters counters(g_ceph_context);
197 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
198 [] (client_id client) -> ClientInfo* {
199 static ClientInfo info{0, 1, 1};
200 return &info;
201 });
202
203 std::optional<error_code> ec1, ec2;
204 std::optional<PhaseType> p1, p2;
205
206 auto now = get_time();
207 queue.async_request(client_id::admin, {}, now, 1, capture(ec1, p1));
208 queue.async_request(client_id::auth, {}, now, 1, capture(ec2, p2));
209 EXPECT_FALSE(ec1);
210 EXPECT_FALSE(ec2);
211
212 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
213 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
214
215 queue.cancel();
216
217 EXPECT_FALSE(ec1);
218 EXPECT_FALSE(ec2);
219
522d829b 220 context.run_for(std::chrono::milliseconds(1));
11fdf7f2
TL
221 EXPECT_TRUE(context.stopped());
222
223 ASSERT_TRUE(ec1);
224 EXPECT_EQ(boost::asio::error::operation_aborted, *ec1);
225 ASSERT_TRUE(ec2);
226 EXPECT_EQ(boost::asio::error::operation_aborted, *ec2);
227
228 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
229 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_res));
230 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
231 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_limit));
232 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_cancel));
233
234 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
235 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
236 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_prio));
237 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_limit));
238 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_cancel));
239}
240
241TEST(Queue, CancelClient)
242{
243 boost::asio::io_context context;
244 ClientCounters counters(g_ceph_context);
245 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
246 [] (client_id client) -> ClientInfo* {
247 static ClientInfo info{0, 1, 1};
248 return &info;
249 });
250
251 std::optional<error_code> ec1, ec2;
252 std::optional<PhaseType> p1, p2;
253
254 auto now = get_time();
255 queue.async_request(client_id::admin, {}, now, 1, capture(ec1, p1));
256 queue.async_request(client_id::auth, {}, now, 1, capture(ec2, p2));
257 EXPECT_FALSE(ec1);
258 EXPECT_FALSE(ec2);
259
260 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
261 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
262
263 queue.cancel(client_id::admin);
264
265 EXPECT_FALSE(ec1);
266 EXPECT_FALSE(ec2);
267
522d829b 268 context.run_for(std::chrono::milliseconds(1));
11fdf7f2
TL
269 EXPECT_TRUE(context.stopped());
270
271 ASSERT_TRUE(ec1);
272 EXPECT_EQ(boost::asio::error::operation_aborted, *ec1);
273
274 ASSERT_TRUE(ec2);
275 EXPECT_EQ(boost::system::errc::success, *ec2);
276 ASSERT_TRUE(p2);
277 EXPECT_EQ(PhaseType::priority, *p2);
278
279 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
280 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_res));
281 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
282 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_limit));
283 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_cancel));
284
285 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
286 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
287 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_prio));
288 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_limit));
289 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_cancel));
290}
291
292TEST(Queue, CancelOnDestructor)
293{
294 boost::asio::io_context context;
295
296 std::optional<error_code> ec1, ec2;
297 std::optional<PhaseType> p1, p2;
298
299 ClientCounters counters(g_ceph_context);
300 {
301 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
302 [] (client_id client) -> ClientInfo* {
303 static ClientInfo info{0, 1, 1};
304 return &info;
305 });
306
307 auto now = get_time();
308 queue.async_request(client_id::admin, {}, now, 1, capture(ec1, p1));
309 queue.async_request(client_id::auth, {}, now, 1, capture(ec2, p2));
310
311 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
312 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
313 }
314
315 EXPECT_FALSE(ec1);
316 EXPECT_FALSE(ec2);
317
522d829b 318 context.run_for(std::chrono::milliseconds(1));
11fdf7f2
TL
319 EXPECT_TRUE(context.stopped());
320
321 ASSERT_TRUE(ec1);
322 EXPECT_EQ(boost::asio::error::operation_aborted, *ec1);
323 ASSERT_TRUE(ec2);
324 EXPECT_EQ(boost::asio::error::operation_aborted, *ec2);
325
326 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
327 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_res));
328 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
329 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_limit));
330 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_cancel));
331
332 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
333 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
334 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_prio));
335 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_limit));
336 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_cancel));
337}
338
339// return a lambda from capture() that's bound to run on the given executor
340template <typename Executor>
341auto capture(const Executor& ex, std::optional<error_code>& opt_ec,
342 std::optional<PhaseType>& opt_res)
343{
344 return boost::asio::bind_executor(ex, capture(opt_ec, opt_res));
345}
346
347TEST(Queue, CrossExecutorRequest)
348{
349 boost::asio::io_context queue_context;
350 ClientCounters counters(g_ceph_context);
351 AsyncScheduler queue(g_ceph_context, queue_context, std::ref(counters), nullptr,
352 [] (client_id client) -> ClientInfo* {
353 static ClientInfo info{0, 1, 1};
354 return &info;
355 });
356
357 // create a separate execution context to use for all callbacks to test that
358 // pending requests maintain executor work guards on both executors
359 boost::asio::io_context callback_context;
360 auto ex2 = callback_context.get_executor();
361
362 std::optional<error_code> ec1, ec2;
363 std::optional<PhaseType> p1, p2;
364
365 auto now = get_time();
366 queue.async_request(client_id::admin, {}, now, 1, capture(ex2, ec1, p1));
367 queue.async_request(client_id::auth, {}, now, 1, capture(ex2, ec2, p2));
368
369 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
370 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
371
522d829b 372 callback_context.run_for(std::chrono::milliseconds(1));
11fdf7f2
TL
373 // maintains work on callback executor while in queue
374 EXPECT_FALSE(callback_context.stopped());
375
376 EXPECT_FALSE(ec1);
377 EXPECT_FALSE(ec2);
378
522d829b 379 queue_context.run_for(std::chrono::milliseconds(1));
11fdf7f2
TL
380 EXPECT_TRUE(queue_context.stopped());
381
382 EXPECT_FALSE(ec1); // no callbacks until callback executor runs
383 EXPECT_FALSE(ec2);
384
522d829b 385 callback_context.run_for(std::chrono::milliseconds(1));
11fdf7f2
TL
386 EXPECT_TRUE(callback_context.stopped());
387
388 ASSERT_TRUE(ec1);
389 EXPECT_EQ(boost::system::errc::success, *ec1);
390 ASSERT_TRUE(p1);
391 EXPECT_EQ(PhaseType::priority, *p1);
392
393 ASSERT_TRUE(ec2);
394 EXPECT_EQ(boost::system::errc::success, *ec2);
395 ASSERT_TRUE(p2);
396 EXPECT_EQ(PhaseType::priority, *p2);
397}
398
399TEST(Queue, SpawnAsyncRequest)
400{
401 boost::asio::io_context context;
402
20effc67 403 spawn::spawn(context, [&] (yield_context yield) {
11fdf7f2
TL
404 ClientCounters counters(g_ceph_context);
405 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
406 [] (client_id client) -> ClientInfo* {
407 static ClientInfo clients[] = {
408 {1, 1, 1}, // admin: satisfy by reservation
409 {0, 1, 1}, // auth: satisfy by priority
410 };
411 return &clients[static_cast<size_t>(client)];
412 });
413
414 error_code ec1, ec2;
415 auto p1 = queue.async_request(client_id::admin, {}, get_time(), 1, yield[ec1]);
416 EXPECT_EQ(boost::system::errc::success, ec1);
417 EXPECT_EQ(PhaseType::reservation, p1);
418
419 auto p2 = queue.async_request(client_id::auth, {}, get_time(), 1, yield[ec2]);
420 EXPECT_EQ(boost::system::errc::success, ec2);
421 EXPECT_EQ(PhaseType::priority, p2);
422 });
423
522d829b 424 context.run_for(std::chrono::milliseconds(1));
11fdf7f2
TL
425 EXPECT_TRUE(context.stopped());
426}
427
11fdf7f2 428} // namespace rgw::dmclock