]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/cls_queue/test_cls_queue.cc
import 15.2.4
[ceph.git] / ceph / src / test / cls_queue / test_cls_queue.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/types.h"
5
6#include "cls/queue/cls_queue_types.h"
7#include "cls/queue/cls_queue_client.h"
8#include "cls/queue/cls_queue_ops.h"
9
10#include "gtest/gtest.h"
11#include "test/librados/test_cxx.h"
12#include "global/global_context.h"
13
14#include <string>
15#include <vector>
16#include <algorithm>
17#include <thread>
18#include <chrono>
19#include <atomic>
20
21class TestClsQueue : public ::testing::Test {
22protected:
23 librados::Rados rados;
24 std::string pool_name;
25 librados::IoCtx ioctx;
26
27 void SetUp() override {
28 pool_name = get_temp_pool_name();
29 ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
30 ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
31 }
32
33 void TearDown() override {
34 ioctx.close();
35 ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
36 }
37
38 void test_enqueue(const std::string& queue_name,
39 int number_of_ops,
40 int number_of_elements,
41 int expected_rc) {
42 librados::ObjectWriteOperation op;
43 // test multiple enqueues
44 for (auto i = 0; i < number_of_ops; ++i) {
45 const std::string element_prefix("op-" +to_string(i) + "-element-");
46 std::vector<bufferlist> data(number_of_elements);
47 // create vector of buffer lists
48 std::generate(data.begin(), data.end(), [j = 0, &element_prefix] () mutable {
49 bufferlist bl;
50 bl.append(element_prefix + to_string(j++));
51 return bl;
52 });
53
54 // enqueue vector
55 cls_queue_enqueue(op, 0, data);
56 ASSERT_EQ(expected_rc, ioctx.operate(queue_name, &op));
57 }
58 }
59};
60
61TEST_F(TestClsQueue, GetCapacity)
62{
63 const std::string queue_name = "my-queue";
64 const uint64_t queue_size = 1024*1024;
65 librados::ObjectWriteOperation op;
66 op.create(true);
67 cls_queue_init(op, queue_name, queue_size);
68 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
69
70 uint64_t size;
71 const int ret = cls_queue_get_capacity(ioctx, queue_name, size);
72 ASSERT_EQ(0, ret);
73 ASSERT_EQ(queue_size, size);
74}
75
76TEST_F(TestClsQueue, Enqueue)
77{
78 const std::string queue_name = "my-queue";
79 const uint64_t queue_size = 1024*1024;
80 librados::ObjectWriteOperation op;
81 op.create(true);
82 cls_queue_init(op, queue_name, queue_size);
83 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
84
85 // test multiple enqueues
86 // 10 iterations, 100 elelemts each
87 // expect 0 (OK)
88 test_enqueue(queue_name, 10, 100, 0);
89}
90
91TEST_F(TestClsQueue, QueueFull)
92{
93 const std::string queue_name = "my-queue";
94 const uint64_t queue_size = 1024;
95 librados::ObjectWriteOperation op;
96 op.create(true);
97 cls_queue_init(op, queue_name, queue_size);
98 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
99
100 // 8 iterations, 5 elelemts each
101 // expect 0 (OK)
102 test_enqueue(queue_name, 8, 5, 0);
103 // 2 iterations, 5 elelemts each
104 // expect -28 (Q FULL)
105 test_enqueue(queue_name, 2, 5, -28);
106}
107
108TEST_F(TestClsQueue, List)
109{
110 const std::string queue_name = "my-queue";
111 const uint64_t queue_size = 1024*1024;
112 librados::ObjectWriteOperation op;
113 op.create(true);
114 cls_queue_init(op, queue_name, queue_size);
115 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
116 const auto number_of_ops = 10;
117 const auto number_of_elements = 100;
118
119 // test multiple enqueues
120 test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
121
122 const auto max_elements = 42;
123 std::string marker;
124 bool truncated = false;
125 std::string next_marker;
126 auto total_elements = 0;
127 do {
128 std::vector<cls_queue_entry> entries;
129 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, next_marker);
130 ASSERT_EQ(0, ret);
131 marker = next_marker;
132 total_elements += entries.size();
133 } while (truncated);
134
135 ASSERT_EQ(total_elements, number_of_ops*number_of_elements);
136}
137
138TEST_F(TestClsQueue, Dequeue)
139{
140 const std::string queue_name = "my-queue";
141 const uint64_t queue_size = 1024*1024;
142 librados::ObjectWriteOperation op;
143 op.create(true);
144 cls_queue_init(op, queue_name, queue_size);
145 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
146
147 // test multiple enqueues
148 test_enqueue(queue_name, 10, 100, 0);
149
150 // get the end marker for 42 elements
151 const auto remove_elements = 42;
152 const std::string marker;
153 bool truncated;
154 std::string end_marker;
155 std::vector<cls_queue_entry> entries;
156 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, remove_elements, entries, &truncated, end_marker);
157 ASSERT_EQ(0, ret);
158 ASSERT_EQ(truncated, true);
159 // remove up to end marker
160 cls_queue_remove_entries(op, end_marker);
161 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
162}
163
e306af50
TL
164TEST_F(TestClsQueue, DequeueMarker)
165{
166 const std::string queue_name = "my-queue";
167 const uint64_t queue_size = 1024*1024;
168 librados::ObjectWriteOperation op;
169 op.create(true);
170 cls_queue_init(op, queue_name, queue_size);
171 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
172
173 // test multiple enqueues
174 test_enqueue(queue_name, 10, 1000, 0);
175
176 const auto remove_elements = 1024;
177 const std::string marker;
178 bool truncated;
179 std::string end_marker;
180 std::vector<cls_queue_entry> entries;
181 auto ret = cls_queue_list_entries(ioctx, queue_name, marker, remove_elements, entries, &truncated, end_marker);
182 ASSERT_EQ(0, ret);
183 ASSERT_EQ(truncated, true);
184 cls_queue_marker after_deleted_marker;
185 // remove specific markers
186 for (const auto& entry : entries) {
187 cls_queue_marker marker;
188 marker.from_str(entry.marker.c_str());
189 ASSERT_EQ(marker.from_str(entry.marker.c_str()), 0);
190 if (marker.offset > 0 && marker.offset % 2 == 0) {
191 after_deleted_marker = marker;
192 cls_queue_remove_entries(op, marker.to_str());
193 }
194 }
195 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
196 entries.clear();
197 ret = cls_queue_list_entries(ioctx, queue_name, marker, remove_elements, entries, &truncated, end_marker);
198 ASSERT_EQ(0, ret);
199 for (const auto& entry : entries) {
200 cls_queue_marker marker;
201 marker.from_str(entry.marker.c_str());
202 ASSERT_EQ(marker.from_str(entry.marker.c_str()), 0);
203 ASSERT_GE(marker.gen, after_deleted_marker.gen);
204 ASSERT_GE(marker.offset, after_deleted_marker.offset);
205 }
206}
207
9f95a23c
TL
208TEST_F(TestClsQueue, ListEmpty)
209{
210 const std::string queue_name = "my-queue";
211 const uint64_t queue_size = 1024*1024;
212 librados::ObjectWriteOperation op;
213 op.create(true);
214 cls_queue_init(op, queue_name, queue_size);
215 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
216
217 const auto max_elements = 50;
218 const std::string marker;
219 bool truncated;
220 std::string next_marker;
221 std::vector<cls_queue_entry> entries;
222 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, next_marker);
223 ASSERT_EQ(0, ret);
224 ASSERT_EQ(truncated, false);
225 ASSERT_EQ(entries.size(), 0);
226}
227
228TEST_F(TestClsQueue, DequeueEmpty)
229{
230 const std::string queue_name = "my-queue";
231 const uint64_t queue_size = 1024*1024;
232 librados::ObjectWriteOperation op;
233 op.create(true);
234 cls_queue_init(op, queue_name, queue_size);
235 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
236
237 const auto max_elements = 50;
238 const std::string marker;
239 bool truncated;
240 std::string end_marker;
241 std::vector<cls_queue_entry> entries;
242 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
243 ASSERT_EQ(0, ret);
244 cls_queue_remove_entries(op, end_marker);
245 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
246}
247
248TEST_F(TestClsQueue, ListAll)
249{
250 const std::string queue_name = "my-queue";
251 const uint64_t queue_size = 1024*1024;
252 librados::ObjectWriteOperation op;
253 op.create(true);
254 cls_queue_init(op, queue_name, queue_size);
255 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
256
257 // test multiple enqueues
258 test_enqueue(queue_name, 10, 100, 0);
259
260 const auto total_elements = 10*100;
261 std::string marker;
262 bool truncated;
263 std::string next_marker;
264 std::vector<cls_queue_entry> entries;
265 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, total_elements, entries, &truncated, next_marker);
266 ASSERT_EQ(0, ret);
267 ASSERT_EQ(entries.size(), total_elements);
268 ASSERT_EQ(truncated, false);
269}
270
271TEST_F(TestClsQueue, DeleteAll)
272{
273 const std::string queue_name = "my-queue";
274 const uint64_t queue_size = 1024*1024;
275 librados::ObjectWriteOperation op;
276 op.create(true);
277 cls_queue_init(op, queue_name, queue_size);
278 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
279
280 // test multiple enqueues
281 test_enqueue(queue_name, 10, 100, 0);
282
283 const auto total_elements = 10*100;
284 const std::string marker;
285 bool truncated;
286 std::string end_marker;
287 std::vector<cls_queue_entry> entries;
288 auto ret = cls_queue_list_entries(ioctx, queue_name, marker, total_elements, entries, &truncated, end_marker);
289 ASSERT_EQ(0, ret);
290 cls_queue_remove_entries(op, end_marker);
291 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
292 // list again to make sure that queue is empty
293 ret = cls_queue_list_entries(ioctx, queue_name, marker, 10, entries, &truncated, end_marker);
294 ASSERT_EQ(0, ret);
295 ASSERT_EQ(truncated, false);
296 ASSERT_EQ(entries.size(), 0);
297}
298
299TEST_F(TestClsQueue, EnqueueDequeue)
300{
301 const std::string queue_name = "my-queue";
302 const uint64_t queue_size = 1024*1024;
303 librados::ObjectWriteOperation op;
304 op.create(true);
305 cls_queue_init(op, queue_name, queue_size);
306 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
307
308 bool done = false;
309 const int number_of_ops = 10;
310 const int number_of_elements = 100;
311
312 std::thread producer([this, &queue_name, &done] {
313 test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
314 done = true;
315 });
316
317 auto consume_count = 0U;
318 std::thread consumer([this, &queue_name, &consume_count, &done] {
319 librados::ObjectWriteOperation op;
320 const auto max_elements = 42;
321 const std::string marker;
322 bool truncated = false;
323 std::string end_marker;
324 std::vector<cls_queue_entry> entries;
325 while (!done || truncated) {
326 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
327 ASSERT_EQ(0, ret);
328 consume_count += entries.size();
329 cls_queue_remove_entries(op, end_marker);
330 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
331 }
332 });
333
334 producer.join();
335 consumer.join();
336 ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
337}
338
339TEST_F(TestClsQueue, QueueFullDequeue)
340{
341 const std::string queue_name = "my-queue";
342 const uint64_t queue_size = 4096;
343 librados::ObjectWriteOperation op;
344 op.create(true);
345 cls_queue_init(op, queue_name, queue_size);
346 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
347
348 bool done = false;
349 const auto number_of_ops = 100;
350 const auto number_of_elements = 50;
351
352 std::thread producer([this, &queue_name, &done] {
353 librados::ObjectWriteOperation op;
354 // test multiple enqueues
355 for (auto i = 0; i < number_of_ops; ++i) {
356 const std::string element_prefix("op-" +to_string(i) + "-element-");
357 std::vector<bufferlist> data(number_of_elements);
358 // create vector of buffer lists
359 std::generate(data.begin(), data.end(), [j = 0, &element_prefix] () mutable {
360 bufferlist bl;
361 bl.append(element_prefix + to_string(j++));
362 return bl;
363 });
364
365 // enqueue vector
366 cls_queue_enqueue(op, 0, data);
367 if (ioctx.operate(queue_name, &op) == -28) {
368 // queue is full - wait and retry
369 --i;
370 std::this_thread::sleep_for(std::chrono::milliseconds(100));
371 }
372 }
373 done = true;
374 });
375
376 auto consume_count = 0;
377 std::thread consumer([this, &queue_name, &consume_count, &done] {
378 librados::ObjectWriteOperation op;
379 const auto max_elements = 42;
380 std::string marker;
381 bool truncated = false;
382 std::string end_marker;
383 std::vector<cls_queue_entry> entries;
384 while (!done || truncated) {
385 auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
386 ASSERT_EQ(0, ret);
387 consume_count += entries.size();
388 cls_queue_remove_entries(op, end_marker);
389 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
390 }
391 });
392
393 producer.join();
394 consumer.join();
395 ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
396}
397
398TEST_F(TestClsQueue, MultiProducer)
399{
400 const std::string queue_name = "my-queue";
401 const uint64_t queue_size = 1024*1024;
402 librados::ObjectWriteOperation op;
403 op.create(true);
404 cls_queue_init(op, queue_name, queue_size);
405 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
406
407 const int max_producer_count = 10;
408 int producer_count = max_producer_count;
409 const int number_of_ops = 10;
410 const int number_of_elements = 100;
411
412 std::vector<std::thread> producers(max_producer_count);
413 for (auto& p : producers) {
414 p = std::move(std::thread([this, &queue_name, &producer_count] {
415 test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
416 --producer_count;
417 }));
418 }
419
420 auto consume_count = 0U;
421 std::thread consumer([this, &queue_name, &consume_count, &producer_count] {
422 librados::ObjectWriteOperation op;
423 const auto max_elements = 42;
424 const std::string marker;
425 bool truncated = false;
426 std::string end_marker;
427 std::vector<cls_queue_entry> entries;
428 while (producer_count > 0 || truncated) {
429 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
430 ASSERT_EQ(0, ret);
431 consume_count += entries.size();
432 cls_queue_remove_entries(op, end_marker);
433 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
434 }
435 });
436
437 for (auto& p : producers) {
438 p.join();
439 }
440 consumer.join();
441 ASSERT_EQ(consume_count, number_of_ops*number_of_elements*max_producer_count);
442}
443
444TEST_F(TestClsQueue, MultiConsumer)
445{
446 const std::string queue_name = "my-queue";
447 const uint64_t queue_size = 1024*1024;
448 librados::ObjectWriteOperation op;
449 op.create(true);
450 cls_queue_init(op, queue_name, queue_size);
451 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
452
453 bool done = false;
454 const int number_of_ops = 10;
455 const int number_of_elements = 100;
456
457 std::thread producer([this, &queue_name, &done] {
458 test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
459 done = true;
460 });
461
462 int consume_count = 0;
463 std::mutex list_and_remove_lock;
464
465 std::vector<std::thread> consumers(10);
466 for (auto& c : consumers) {
467 c = std::thread([this, &queue_name, &consume_count, &done, &list_and_remove_lock] {
468 librados::ObjectWriteOperation op;
469 const auto max_elements = 42;
470 const std::string marker;
471 bool truncated = false;
472 std::string end_marker;
473 std::vector<cls_queue_entry> entries;
474 while (!done || truncated) {
475 std::lock_guard lock(list_and_remove_lock);
476 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
477 ASSERT_EQ(0, ret);
478 consume_count += entries.size();
479 cls_queue_remove_entries(op, end_marker);
480 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
481 }
482 });
483 }
484
485 producer.join();
486 for (auto& c : consumers) {
487 c.join();
488 }
489 ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
490}
491
492TEST_F(TestClsQueue, NoLockMultiConsumer)
493{
494 const std::string queue_name = "my-queue";
495 const uint64_t queue_size = 1024*1024;
496 librados::ObjectWriteOperation op;
497 op.create(true);
498 cls_queue_init(op, queue_name, queue_size);
499 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
500
501 bool done = false;
502 const int number_of_ops = 10;
503 const int number_of_elements = 100;
504
505 std::thread producer([this, &queue_name, &done] {
506 test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
507 done = true;
508 });
509
510 std::vector<std::thread> consumers(5);
511 for (auto& c : consumers) {
512 c = std::thread([this, &queue_name, &done] {
513 librados::ObjectWriteOperation op;
514 const auto max_elements = 42;
515 const std::string marker;
516 bool truncated = false;
517 std::string end_marker;
518 std::vector<cls_queue_entry> entries;
519 while (!done || truncated) {
520 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
521 ASSERT_EQ(0, ret);
522 cls_queue_remove_entries(op, end_marker);
523 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
524 }
525 });
526 }
527
528 producer.join();
529 for (auto& c : consumers) {
530 c.join();
531 }
532
533 // make sure queue is empty
534 const auto max_elements = 1000;
535 const std::string marker;
536 bool truncated = false;
537 std::string end_marker;
538 std::vector<cls_queue_entry> entries;
539 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
540 ASSERT_EQ(0, ret);
541 ASSERT_EQ(entries.size(), 0);
542 ASSERT_EQ(truncated, false);
543}
544