]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/test_rgw_dmclock_scheduler.cc
import ceph 14.2.5
[ceph.git] / ceph / src / test / rgw / test_rgw_dmclock_scheduler.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2018 Red Hat, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 //#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
16
17 #include "rgw/rgw_dmclock_sync_scheduler.h"
18 #include "rgw/rgw_dmclock_async_scheduler.h"
19
20 #include <optional>
21 #ifdef HAVE_BOOST_CONTEXT
22 #include <boost/asio/spawn.hpp>
23 #endif
24 #include <gtest/gtest.h>
25 #include "acconfig.h"
26 #include "global/global_context.h"
27
28 namespace rgw::dmclock {
29
30 using boost::system::error_code;
31
32 // return a lambda that can be used as a callback to capture its arguments
33 auto capture(std::optional<error_code>& opt_ec,
34 std::optional<PhaseType>& opt_phase)
35 {
36 return [&] (error_code ec, PhaseType phase) {
37 opt_ec = ec;
38 opt_phase = phase;
39 };
40 }
41
42 TEST(Queue, SyncRequest)
43 {
44 ClientCounters counters(g_ceph_context);
45 auto client_info_f = [] (client_id client) -> ClientInfo* {
46 static ClientInfo clients[] = {
47 {1, 1, 1}, //admin: satisfy by reservation
48 {0, 1, 1}, //auth: satisfy by priority
49 };
50 return &clients[static_cast<size_t>(client)];
51 };
52 std::atomic <bool> ready = false;
53 auto server_ready_f = [&ready]() -> bool { return ready.load();};
54
55 SyncScheduler queue(g_ceph_context, std::ref(counters),
56 client_info_f, server_ready_f,
57 std::ref(SyncScheduler::handle_request_cb)
58 );
59
60
61 auto now = get_time();
62 ready = true;
63 queue.add_request(client_id::admin, {}, now, 1);
64 queue.add_request(client_id::auth, {}, now, 1);
65
66 // We can't see the queue at length 1 as the queue len is decremented as the
67 //request is processed
68 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
69 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_res));
70 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
71 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_limit));
72 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_cancel));
73
74 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
75 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
76 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_prio));
77 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_limit));
78 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_cancel));
79 }
80
81 #ifdef HAVE_BOOST_CONTEXT
82 TEST(Queue, RateLimit)
83 {
84 boost::asio::io_context context;
85 ClientCounters counters(g_ceph_context);
86 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
87 [] (client_id client) -> ClientInfo* {
88 static ClientInfo clients[] = {
89 {1, 1, 1}, // admin
90 {0, 1, 1}, // auth
91 };
92 return &clients[static_cast<size_t>(client)];
93 }, AtLimit::Reject);
94
95 std::optional<error_code> ec1, ec2, ec3, ec4;
96 std::optional<PhaseType> p1, p2, p3, p4;
97
98 auto now = get_time();
99 queue.async_request(client_id::admin, {}, now, 1, capture(ec1, p1));
100 queue.async_request(client_id::admin, {}, now, 1, capture(ec2, p2));
101 queue.async_request(client_id::auth, {}, now, 1, capture(ec3, p3));
102 queue.async_request(client_id::auth, {}, now, 1, capture(ec4, p4));
103 EXPECT_FALSE(ec1);
104 EXPECT_FALSE(ec2);
105 EXPECT_FALSE(ec3);
106 EXPECT_FALSE(ec4);
107
108 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
109 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
110
111 context.poll();
112 EXPECT_TRUE(context.stopped());
113
114 ASSERT_TRUE(ec1);
115 EXPECT_EQ(boost::system::errc::success, *ec1);
116 ASSERT_TRUE(p1);
117 EXPECT_EQ(PhaseType::reservation, *p1);
118
119 ASSERT_TRUE(ec2);
120 EXPECT_EQ(boost::system::errc::resource_unavailable_try_again, *ec2);
121
122 ASSERT_TRUE(ec3);
123 EXPECT_EQ(boost::system::errc::success, *ec3);
124 ASSERT_TRUE(p3);
125 EXPECT_EQ(PhaseType::priority, *p3);
126
127 ASSERT_TRUE(ec4);
128 EXPECT_EQ(boost::system::errc::resource_unavailable_try_again, *ec4);
129
130 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
131 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_res));
132 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
133 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_limit));
134 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_cancel));
135
136 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
137 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
138 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_prio));
139 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_limit));
140 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_cancel));
141 }
142
143 TEST(Queue, AsyncRequest)
144 {
145 boost::asio::io_context context;
146 ClientCounters counters(g_ceph_context);
147 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
148 [] (client_id client) -> ClientInfo* {
149 static ClientInfo clients[] = {
150 {1, 1, 1}, // admin: satisfy by reservation
151 {0, 1, 1}, // auth: satisfy by priority
152 };
153 return &clients[static_cast<size_t>(client)];
154 }, AtLimit::Reject
155 );
156
157 std::optional<error_code> ec1, ec2;
158 std::optional<PhaseType> p1, p2;
159
160 auto now = get_time();
161 queue.async_request(client_id::admin, {}, now, 1, capture(ec1, p1));
162 queue.async_request(client_id::auth, {}, now, 1, capture(ec2, p2));
163 EXPECT_FALSE(ec1);
164 EXPECT_FALSE(ec2);
165
166 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
167 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
168
169 context.poll();
170 EXPECT_TRUE(context.stopped());
171
172 ASSERT_TRUE(ec1);
173 EXPECT_EQ(boost::system::errc::success, *ec1);
174 ASSERT_TRUE(p1);
175 EXPECT_EQ(PhaseType::reservation, *p1);
176
177 ASSERT_TRUE(ec2);
178 EXPECT_EQ(boost::system::errc::success, *ec2);
179 ASSERT_TRUE(p2);
180 EXPECT_EQ(PhaseType::priority, *p2);
181
182 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
183 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_res));
184 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
185 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_limit));
186 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_cancel));
187
188 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
189 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
190 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_prio));
191 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_limit));
192 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_cancel));
193 }
194
195
196 TEST(Queue, Cancel)
197 {
198 boost::asio::io_context context;
199 ClientCounters counters(g_ceph_context);
200 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
201 [] (client_id client) -> ClientInfo* {
202 static ClientInfo info{0, 1, 1};
203 return &info;
204 });
205
206 std::optional<error_code> ec1, ec2;
207 std::optional<PhaseType> p1, p2;
208
209 auto now = get_time();
210 queue.async_request(client_id::admin, {}, now, 1, capture(ec1, p1));
211 queue.async_request(client_id::auth, {}, now, 1, capture(ec2, p2));
212 EXPECT_FALSE(ec1);
213 EXPECT_FALSE(ec2);
214
215 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
216 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
217
218 queue.cancel();
219
220 EXPECT_FALSE(ec1);
221 EXPECT_FALSE(ec2);
222
223 context.poll();
224 EXPECT_TRUE(context.stopped());
225
226 ASSERT_TRUE(ec1);
227 EXPECT_EQ(boost::asio::error::operation_aborted, *ec1);
228 ASSERT_TRUE(ec2);
229 EXPECT_EQ(boost::asio::error::operation_aborted, *ec2);
230
231 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
232 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_res));
233 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
234 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_limit));
235 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_cancel));
236
237 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
238 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
239 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_prio));
240 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_limit));
241 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_cancel));
242 }
243
244 TEST(Queue, CancelClient)
245 {
246 boost::asio::io_context context;
247 ClientCounters counters(g_ceph_context);
248 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
249 [] (client_id client) -> ClientInfo* {
250 static ClientInfo info{0, 1, 1};
251 return &info;
252 });
253
254 std::optional<error_code> ec1, ec2;
255 std::optional<PhaseType> p1, p2;
256
257 auto now = get_time();
258 queue.async_request(client_id::admin, {}, now, 1, capture(ec1, p1));
259 queue.async_request(client_id::auth, {}, now, 1, capture(ec2, p2));
260 EXPECT_FALSE(ec1);
261 EXPECT_FALSE(ec2);
262
263 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
264 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
265
266 queue.cancel(client_id::admin);
267
268 EXPECT_FALSE(ec1);
269 EXPECT_FALSE(ec2);
270
271 context.poll();
272 EXPECT_TRUE(context.stopped());
273
274 ASSERT_TRUE(ec1);
275 EXPECT_EQ(boost::asio::error::operation_aborted, *ec1);
276
277 ASSERT_TRUE(ec2);
278 EXPECT_EQ(boost::system::errc::success, *ec2);
279 ASSERT_TRUE(p2);
280 EXPECT_EQ(PhaseType::priority, *p2);
281
282 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
283 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_res));
284 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
285 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_limit));
286 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_cancel));
287
288 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
289 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
290 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_prio));
291 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_limit));
292 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_cancel));
293 }
294
295 TEST(Queue, CancelOnDestructor)
296 {
297 boost::asio::io_context context;
298
299 std::optional<error_code> ec1, ec2;
300 std::optional<PhaseType> p1, p2;
301
302 ClientCounters counters(g_ceph_context);
303 {
304 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
305 [] (client_id client) -> ClientInfo* {
306 static ClientInfo info{0, 1, 1};
307 return &info;
308 });
309
310 auto now = get_time();
311 queue.async_request(client_id::admin, {}, now, 1, capture(ec1, p1));
312 queue.async_request(client_id::auth, {}, now, 1, capture(ec2, p2));
313
314 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
315 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
316 }
317
318 EXPECT_FALSE(ec1);
319 EXPECT_FALSE(ec2);
320
321 context.poll();
322 EXPECT_TRUE(context.stopped());
323
324 ASSERT_TRUE(ec1);
325 EXPECT_EQ(boost::asio::error::operation_aborted, *ec1);
326 ASSERT_TRUE(ec2);
327 EXPECT_EQ(boost::asio::error::operation_aborted, *ec2);
328
329 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_qlen));
330 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_res));
331 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_prio));
332 EXPECT_EQ(0u, counters(client_id::admin)->get(queue_counters::l_limit));
333 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_cancel));
334
335 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_qlen));
336 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_res));
337 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_prio));
338 EXPECT_EQ(0u, counters(client_id::auth)->get(queue_counters::l_limit));
339 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_cancel));
340 }
341
342 // return a lambda from capture() that's bound to run on the given executor
343 template <typename Executor>
344 auto capture(const Executor& ex, std::optional<error_code>& opt_ec,
345 std::optional<PhaseType>& opt_res)
346 {
347 return boost::asio::bind_executor(ex, capture(opt_ec, opt_res));
348 }
349
350 TEST(Queue, CrossExecutorRequest)
351 {
352 boost::asio::io_context queue_context;
353 ClientCounters counters(g_ceph_context);
354 AsyncScheduler queue(g_ceph_context, queue_context, std::ref(counters), nullptr,
355 [] (client_id client) -> ClientInfo* {
356 static ClientInfo info{0, 1, 1};
357 return &info;
358 });
359
360 // create a separate execution context to use for all callbacks to test that
361 // pending requests maintain executor work guards on both executors
362 boost::asio::io_context callback_context;
363 auto ex2 = callback_context.get_executor();
364
365 std::optional<error_code> ec1, ec2;
366 std::optional<PhaseType> p1, p2;
367
368 auto now = get_time();
369 queue.async_request(client_id::admin, {}, now, 1, capture(ex2, ec1, p1));
370 queue.async_request(client_id::auth, {}, now, 1, capture(ex2, ec2, p2));
371
372 EXPECT_EQ(1u, counters(client_id::admin)->get(queue_counters::l_qlen));
373 EXPECT_EQ(1u, counters(client_id::auth)->get(queue_counters::l_qlen));
374
375 callback_context.poll();
376 // maintains work on callback executor while in queue
377 EXPECT_FALSE(callback_context.stopped());
378
379 EXPECT_FALSE(ec1);
380 EXPECT_FALSE(ec2);
381
382 queue_context.poll();
383 EXPECT_TRUE(queue_context.stopped());
384
385 EXPECT_FALSE(ec1); // no callbacks until callback executor runs
386 EXPECT_FALSE(ec2);
387
388 callback_context.poll();
389 EXPECT_TRUE(callback_context.stopped());
390
391 ASSERT_TRUE(ec1);
392 EXPECT_EQ(boost::system::errc::success, *ec1);
393 ASSERT_TRUE(p1);
394 EXPECT_EQ(PhaseType::priority, *p1);
395
396 ASSERT_TRUE(ec2);
397 EXPECT_EQ(boost::system::errc::success, *ec2);
398 ASSERT_TRUE(p2);
399 EXPECT_EQ(PhaseType::priority, *p2);
400 }
401
402 TEST(Queue, SpawnAsyncRequest)
403 {
404 boost::asio::io_context context;
405
406 boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
407 ClientCounters counters(g_ceph_context);
408 AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
409 [] (client_id client) -> ClientInfo* {
410 static ClientInfo clients[] = {
411 {1, 1, 1}, // admin: satisfy by reservation
412 {0, 1, 1}, // auth: satisfy by priority
413 };
414 return &clients[static_cast<size_t>(client)];
415 });
416
417 error_code ec1, ec2;
418 auto p1 = queue.async_request(client_id::admin, {}, get_time(), 1, yield[ec1]);
419 EXPECT_EQ(boost::system::errc::success, ec1);
420 EXPECT_EQ(PhaseType::reservation, p1);
421
422 auto p2 = queue.async_request(client_id::auth, {}, get_time(), 1, yield[ec2]);
423 EXPECT_EQ(boost::system::errc::success, ec2);
424 EXPECT_EQ(PhaseType::priority, p2);
425 });
426
427 context.poll();
428 EXPECT_TRUE(context.stopped());
429 }
430
431 #endif
432
433 } // namespace rgw::dmclock