]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/test_rgw_dmclock_scheduler.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / test / rgw / test_rgw_dmclock_scheduler.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2018 Red Hat, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 //#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
16
17 #include "rgw/rgw_dmclock_sync_scheduler.h"
18 #include "rgw/rgw_dmclock_async_scheduler.h"
19
20 #include <optional>
21 #include <spawn/spawn.hpp>
22 #include <gtest/gtest.h>
23 #include "acconfig.h"
24 #include "global/global_context.h"
25
26 namespace rgw::dmclock {
27
28 using boost::system::error_code;
29
30 // return a lambda that can be used as a callback to capture its arguments
31 auto capture(std::optional<error_code>& opt_ec,
32 std::optional<PhaseType>& opt_phase)
33 {
34 return [&] (error_code ec, PhaseType phase) {
35 opt_ec = ec;
36 opt_phase = phase;
37 };
38 }
39
40 TEST(Queue, SyncRequest)
41 {
42 ClientCounters counters(g_ceph_context);
43 auto client_info_f = [] (client_id client) -> ClientInfo* {
44 static ClientInfo clients[] = {
45 {1, 1, 1}, //admin: satisfy by reservation
46 {0, 1, 1}, //auth: satisfy by priority
47 };
48 return &clients[static_cast<size_t>(client)];
49 };
50 std::atomic <bool> ready = false;
51 auto server_ready_f = [&ready]() -> bool { return ready.load();};
52
53 SyncScheduler queue(g_ceph_context, std::ref(counters),
54 client_info_f, server_ready_f,
55 std::ref(SyncScheduler::handle_request_cb)
56 );
57
58
59 auto now = get_time();
60 ready = true;
61 queue.add_request(client_id::admin, {}, now, 1);
62 queue.add_request(client_id::auth, {}, now, 1);
63
64 // We can't see the queue at length 1 as the queue len is decremented as the
65 //request is processed
66 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
67 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_res));
68 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
69 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_limit));
70 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_cancel));
71
72 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
73 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
74 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_prio));
75 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_limit));
76 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_cancel));
77 }
78
79 TEST(Queue, RateLimit)
80 {
81 boost::asio::io_context context;
82 ClientCounters counters(g_ceph_context);
83 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
84 [] (client_id client) -> ClientInfo* {
85 static ClientInfo clients[] = {
86 {1, 1, 1}, // admin
87 {0, 1, 1}, // auth
88 };
89 return &clients[static_cast<size_t>(client)];
90 }, AtLimit::Reject);
91
92 std::optional<error_code> ec1, ec2, ec3, ec4;
93 std::optional<PhaseType> p1, p2, p3, p4;
94
95 auto now = get_time();
96 queue.async_request(client_id::admin, {}, now, 1, capture(ec1, p1));
97 queue.async_request(client_id::admin, {}, now, 1, capture(ec2, p2));
98 queue.async_request(client_id::auth, {}, now, 1, capture(ec3, p3));
99 queue.async_request(client_id::auth, {}, now, 1, capture(ec4, p4));
100 EXPECT_FALSE(ec1);
101 EXPECT_FALSE(ec2);
102 EXPECT_FALSE(ec3);
103 EXPECT_FALSE(ec4);
104
105 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
106 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
107
108 context.run_for(std::chrono::milliseconds(1));
109 EXPECT_TRUE(context.stopped());
110
111 ASSERT_TRUE(ec1);
112 EXPECT_EQ(boost::system::errc::success, *ec1);
113 ASSERT_TRUE(p1);
114 EXPECT_EQ(PhaseType::reservation, *p1);
115
116 ASSERT_TRUE(ec2);
117 EXPECT_EQ(boost::system::errc::resource_unavailable_try_again, *ec2);
118
119 ASSERT_TRUE(ec3);
120 EXPECT_EQ(boost::system::errc::success, *ec3);
121 ASSERT_TRUE(p3);
122 EXPECT_EQ(PhaseType::priority, *p3);
123
124 ASSERT_TRUE(ec4);
125 EXPECT_EQ(boost::system::errc::resource_unavailable_try_again, *ec4);
126
127 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
128 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_res));
129 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
130 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_limit));
131 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_cancel));
132
133 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
134 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
135 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_prio));
136 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_limit));
137 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_cancel));
138 }
139
140 TEST(Queue, AsyncRequest)
141 {
142 boost::asio::io_context context;
143 ClientCounters counters(g_ceph_context);
144 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
145 [] (client_id client) -> ClientInfo* {
146 static ClientInfo clients[] = {
147 {1, 1, 1}, // admin: satisfy by reservation
148 {0, 1, 1}, // auth: satisfy by priority
149 };
150 return &clients[static_cast<size_t>(client)];
151 }, AtLimit::Reject
152 );
153
154 std::optional<error_code> ec1, ec2;
155 std::optional<PhaseType> p1, p2;
156
157 auto now = get_time();
158 queue.async_request(client_id::admin, {}, now, 1, capture(ec1, p1));
159 queue.async_request(client_id::auth, {}, now, 1, capture(ec2, p2));
160 EXPECT_FALSE(ec1);
161 EXPECT_FALSE(ec2);
162
163 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
164 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
165
166 context.run_for(std::chrono::milliseconds(1));
167 EXPECT_TRUE(context.stopped());
168
169 ASSERT_TRUE(ec1);
170 EXPECT_EQ(boost::system::errc::success, *ec1);
171 ASSERT_TRUE(p1);
172 EXPECT_EQ(PhaseType::reservation, *p1);
173
174 ASSERT_TRUE(ec2);
175 EXPECT_EQ(boost::system::errc::success, *ec2);
176 ASSERT_TRUE(p2);
177 EXPECT_EQ(PhaseType::priority, *p2);
178
179 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
180 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_res));
181 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
182 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_limit));
183 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_cancel));
184
185 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
186 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
187 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_prio));
188 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_limit));
189 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_cancel));
190 }
191
192
193 TEST(Queue, Cancel)
194 {
195 boost::asio::io_context context;
196 ClientCounters counters(g_ceph_context);
197 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
198 [] (client_id client) -> ClientInfo* {
199 static ClientInfo info{0, 1, 1};
200 return &info;
201 });
202
203 std::optional<error_code> ec1, ec2;
204 std::optional<PhaseType> p1, p2;
205
206 auto now = get_time();
207 queue.async_request(client_id::admin, {}, now, 1, capture(ec1, p1));
208 queue.async_request(client_id::auth, {}, now, 1, capture(ec2, p2));
209 EXPECT_FALSE(ec1);
210 EXPECT_FALSE(ec2);
211
212 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
213 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
214
215 queue.cancel();
216
217 EXPECT_FALSE(ec1);
218 EXPECT_FALSE(ec2);
219
220 context.run_for(std::chrono::milliseconds(1));
221 EXPECT_TRUE(context.stopped());
222
223 ASSERT_TRUE(ec1);
224 EXPECT_EQ(boost::asio::error::operation_aborted, *ec1);
225 ASSERT_TRUE(ec2);
226 EXPECT_EQ(boost::asio::error::operation_aborted, *ec2);
227
228 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
229 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_res));
230 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
231 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_limit));
232 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_cancel));
233
234 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
235 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
236 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_prio));
237 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_limit));
238 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_cancel));
239 }
240
241 TEST(Queue, CancelClient)
242 {
243 boost::asio::io_context context;
244 ClientCounters counters(g_ceph_context);
245 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
246 [] (client_id client) -> ClientInfo* {
247 static ClientInfo info{0, 1, 1};
248 return &info;
249 });
250
251 std::optional<error_code> ec1, ec2;
252 std::optional<PhaseType> p1, p2;
253
254 auto now = get_time();
255 queue.async_request(client_id::admin, {}, now, 1, capture(ec1, p1));
256 queue.async_request(client_id::auth, {}, now, 1, capture(ec2, p2));
257 EXPECT_FALSE(ec1);
258 EXPECT_FALSE(ec2);
259
260 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
261 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
262
263 queue.cancel(client_id::admin);
264
265 EXPECT_FALSE(ec1);
266 EXPECT_FALSE(ec2);
267
268 context.run_for(std::chrono::milliseconds(1));
269 EXPECT_TRUE(context.stopped());
270
271 ASSERT_TRUE(ec1);
272 EXPECT_EQ(boost::asio::error::operation_aborted, *ec1);
273
274 ASSERT_TRUE(ec2);
275 EXPECT_EQ(boost::system::errc::success, *ec2);
276 ASSERT_TRUE(p2);
277 EXPECT_EQ(PhaseType::priority, *p2);
278
279 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
280 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_res));
281 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
282 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_limit));
283 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_cancel));
284
285 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
286 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
287 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_prio));
288 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_limit));
289 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_cancel));
290 }
291
292 TEST(Queue, CancelOnDestructor)
293 {
294 boost::asio::io_context context;
295
296 std::optional<error_code> ec1, ec2;
297 std::optional<PhaseType> p1, p2;
298
299 ClientCounters counters(g_ceph_context);
300 {
301 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
302 [] (client_id client) -> ClientInfo* {
303 static ClientInfo info{0, 1, 1};
304 return &info;
305 });
306
307 auto now = get_time();
308 queue.async_request(client_id::admin, {}, now, 1, capture(ec1, p1));
309 queue.async_request(client_id::auth, {}, now, 1, capture(ec2, p2));
310
311 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
312 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
313 }
314
315 EXPECT_FALSE(ec1);
316 EXPECT_FALSE(ec2);
317
318 context.run_for(std::chrono::milliseconds(1));
319 EXPECT_TRUE(context.stopped());
320
321 ASSERT_TRUE(ec1);
322 EXPECT_EQ(boost::asio::error::operation_aborted, *ec1);
323 ASSERT_TRUE(ec2);
324 EXPECT_EQ(boost::asio::error::operation_aborted, *ec2);
325
326 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
327 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_res));
328 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
329 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_limit));
330 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_cancel));
331
332 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
333 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
334 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_prio));
335 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_limit));
336 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_cancel));
337 }
338
339 // return a lambda from capture() that's bound to run on the given executor
340 template <typename Executor>
341 auto capture(const Executor& ex, std::optional<error_code>& opt_ec,
342 std::optional<PhaseType>& opt_res)
343 {
344 return boost::asio::bind_executor(ex, capture(opt_ec, opt_res));
345 }
346
347 TEST(Queue, CrossExecutorRequest)
348 {
349 boost::asio::io_context queue_context;
350 ClientCounters counters(g_ceph_context);
351 AsyncScheduler queue(g_ceph_context, queue_context, std::ref(counters), nullptr,
352 [] (client_id client) -> ClientInfo* {
353 static ClientInfo info{0, 1, 1};
354 return &info;
355 });
356
357 // create a separate execution context to use for all callbacks to test that
358 // pending requests maintain executor work guards on both executors
359 boost::asio::io_context callback_context;
360 auto ex2 = callback_context.get_executor();
361
362 std::optional<error_code> ec1, ec2;
363 std::optional<PhaseType> p1, p2;
364
365 auto now = get_time();
366 queue.async_request(client_id::admin, {}, now, 1, capture(ex2, ec1, p1));
367 queue.async_request(client_id::auth, {}, now, 1, capture(ex2, ec2, p2));
368
369 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
370 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
371
372 callback_context.run_for(std::chrono::milliseconds(1));
373 // maintains work on callback executor while in queue
374 EXPECT_FALSE(callback_context.stopped());
375
376 EXPECT_FALSE(ec1);
377 EXPECT_FALSE(ec2);
378
379 queue_context.run_for(std::chrono::milliseconds(1));
380 EXPECT_TRUE(queue_context.stopped());
381
382 EXPECT_FALSE(ec1); // no callbacks until callback executor runs
383 EXPECT_FALSE(ec2);
384
385 callback_context.run_for(std::chrono::milliseconds(1));
386 EXPECT_TRUE(callback_context.stopped());
387
388 ASSERT_TRUE(ec1);
389 EXPECT_EQ(boost::system::errc::success, *ec1);
390 ASSERT_TRUE(p1);
391 EXPECT_EQ(PhaseType::priority, *p1);
392
393 ASSERT_TRUE(ec2);
394 EXPECT_EQ(boost::system::errc::success, *ec2);
395 ASSERT_TRUE(p2);
396 EXPECT_EQ(PhaseType::priority, *p2);
397 }
398
399 TEST(Queue, SpawnAsyncRequest)
400 {
401 boost::asio::io_context context;
402
403 spawn::spawn(context, [&] (yield_context yield) {
404 ClientCounters counters(g_ceph_context);
405 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
406 [] (client_id client) -> ClientInfo* {
407 static ClientInfo clients[] = {
408 {1, 1, 1}, // admin: satisfy by reservation
409 {0, 1, 1}, // auth: satisfy by priority
410 };
411 return &clients[static_cast<size_t>(client)];
412 });
413
414 error_code ec1, ec2;
415 auto p1 = queue.async_request(client_id::admin, {}, get_time(), 1, yield[ec1]);
416 EXPECT_EQ(boost::system::errc::success, ec1);
417 EXPECT_EQ(PhaseType::reservation, p1);
418
419 auto p2 = queue.async_request(client_id::auth, {}, get_time(), 1, yield[ec2]);
420 EXPECT_EQ(boost::system::errc::success, ec2);
421 EXPECT_EQ(PhaseType::priority, p2);
422 });
423
424 context.run_for(std::chrono::milliseconds(1));
425 EXPECT_TRUE(context.stopped());
426 }
427
428 } // namespace rgw::dmclock