]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/cls_queue/test_cls_queue.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / cls_queue / test_cls_queue.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/types.h"
5
6 #include "cls/queue/cls_queue_types.h"
7 #include "cls/queue/cls_queue_client.h"
8 #include "cls/queue/cls_queue_ops.h"
9
10 #include "gtest/gtest.h"
11 #include "test/librados/test_cxx.h"
12 #include "global/global_context.h"
13
14 #include <string>
15 #include <vector>
16 #include <algorithm>
17 #include <thread>
18 #include <chrono>
19 #include <atomic>
20
21 using namespace std;
22
23 class TestClsQueue : public ::testing::Test {
24 protected:
25 librados::Rados rados;
26 std::string pool_name;
27 librados::IoCtx ioctx;
28
29 void SetUp() override {
30 pool_name = get_temp_pool_name();
31 ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
32 ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
33 }
34
35 void TearDown() override {
36 ioctx.close();
37 ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
38 }
39
40 void test_enqueue(const std::string& queue_name,
41 int number_of_ops,
42 int number_of_elements,
43 int expected_rc) {
44 librados::ObjectWriteOperation op;
45 // test multiple enqueues
46 for (auto i = 0; i < number_of_ops; ++i) {
47 const std::string element_prefix("op-" +to_string(i) + "-element-");
48 std::vector<bufferlist> data(number_of_elements);
49 // create vector of buffer lists
50 std::generate(data.begin(), data.end(), [j = 0, &element_prefix] () mutable {
51 bufferlist bl;
52 bl.append(element_prefix + to_string(j++));
53 return bl;
54 });
55
56 // enqueue vector
57 cls_queue_enqueue(op, 0, data);
58 ASSERT_EQ(expected_rc, ioctx.operate(queue_name, &op));
59 }
60 }
61 };
62
63 TEST_F(TestClsQueue, GetCapacity)
64 {
65 const std::string queue_name = "my-queue";
66 const uint64_t queue_size = 1024*1024;
67 librados::ObjectWriteOperation op;
68 op.create(true);
69 cls_queue_init(op, queue_name, queue_size);
70 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
71
72 uint64_t size;
73 const int ret = cls_queue_get_capacity(ioctx, queue_name, size);
74 ASSERT_EQ(0, ret);
75 ASSERT_EQ(queue_size, size);
76 }
77
78 TEST_F(TestClsQueue, Enqueue)
79 {
80 const std::string queue_name = "my-queue";
81 const uint64_t queue_size = 1024*1024;
82 librados::ObjectWriteOperation op;
83 op.create(true);
84 cls_queue_init(op, queue_name, queue_size);
85 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
86
87 // test multiple enqueues
88 // 10 iterations, 100 elelemts each
89 // expect 0 (OK)
90 test_enqueue(queue_name, 10, 100, 0);
91 }
92
93 TEST_F(TestClsQueue, QueueFull)
94 {
95 const std::string queue_name = "my-queue";
96 const uint64_t queue_size = 1024;
97 librados::ObjectWriteOperation op;
98 op.create(true);
99 cls_queue_init(op, queue_name, queue_size);
100 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
101
102 // 8 iterations, 5 elelemts each
103 // expect 0 (OK)
104 test_enqueue(queue_name, 8, 5, 0);
105 // 2 iterations, 5 elelemts each
106 // expect -28 (Q FULL)
107 test_enqueue(queue_name, 2, 5, -28);
108 }
109
110 TEST_F(TestClsQueue, List)
111 {
112 const std::string queue_name = "my-queue";
113 const uint64_t queue_size = 1024*1024;
114 librados::ObjectWriteOperation op;
115 op.create(true);
116 cls_queue_init(op, queue_name, queue_size);
117 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
118 const auto number_of_ops = 10;
119 const auto number_of_elements = 100;
120
121 // test multiple enqueues
122 test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
123
124 const auto max_elements = 42;
125 std::string marker;
126 bool truncated = false;
127 std::string next_marker;
128 auto total_elements = 0;
129 do {
130 std::vector<cls_queue_entry> entries;
131 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, next_marker);
132 ASSERT_EQ(0, ret);
133 marker = next_marker;
134 total_elements += entries.size();
135 } while (truncated);
136
137 ASSERT_EQ(total_elements, number_of_ops*number_of_elements);
138 }
139
140 TEST_F(TestClsQueue, Dequeue)
141 {
142 const std::string queue_name = "my-queue";
143 const uint64_t queue_size = 1024*1024;
144 librados::ObjectWriteOperation op;
145 op.create(true);
146 cls_queue_init(op, queue_name, queue_size);
147 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
148
149 // test multiple enqueues
150 test_enqueue(queue_name, 10, 100, 0);
151
152 // get the end marker for 42 elements
153 const auto remove_elements = 42;
154 const std::string marker;
155 bool truncated;
156 std::string end_marker;
157 std::vector<cls_queue_entry> entries;
158 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, remove_elements, entries, &truncated, end_marker);
159 ASSERT_EQ(0, ret);
160 ASSERT_EQ(truncated, true);
161 // remove up to end marker
162 cls_queue_remove_entries(op, end_marker);
163 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
164 }
165
166 TEST_F(TestClsQueue, DequeueMarker)
167 {
168 const std::string queue_name = "my-queue";
169 const uint64_t queue_size = 1024*1024;
170 librados::ObjectWriteOperation op;
171 op.create(true);
172 cls_queue_init(op, queue_name, queue_size);
173 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
174
175 // test multiple enqueues
176 test_enqueue(queue_name, 10, 1000, 0);
177
178 const auto remove_elements = 1024;
179 const std::string marker;
180 bool truncated;
181 std::string end_marker;
182 std::vector<cls_queue_entry> entries;
183 auto ret = cls_queue_list_entries(ioctx, queue_name, marker, remove_elements, entries, &truncated, end_marker);
184 ASSERT_EQ(0, ret);
185 ASSERT_EQ(truncated, true);
186 cls_queue_marker after_deleted_marker;
187 // remove specific markers
188 for (const auto& entry : entries) {
189 cls_queue_marker marker;
190 marker.from_str(entry.marker.c_str());
191 ASSERT_EQ(marker.from_str(entry.marker.c_str()), 0);
192 if (marker.offset > 0 && marker.offset % 2 == 0) {
193 after_deleted_marker = marker;
194 cls_queue_remove_entries(op, marker.to_str());
195 }
196 }
197 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
198 entries.clear();
199 ret = cls_queue_list_entries(ioctx, queue_name, marker, remove_elements, entries, &truncated, end_marker);
200 ASSERT_EQ(0, ret);
201 for (const auto& entry : entries) {
202 cls_queue_marker marker;
203 marker.from_str(entry.marker.c_str());
204 ASSERT_EQ(marker.from_str(entry.marker.c_str()), 0);
205 ASSERT_GE(marker.gen, after_deleted_marker.gen);
206 ASSERT_GE(marker.offset, after_deleted_marker.offset);
207 }
208 }
209
210 TEST_F(TestClsQueue, ListEmpty)
211 {
212 const std::string queue_name = "my-queue";
213 const uint64_t queue_size = 1024*1024;
214 librados::ObjectWriteOperation op;
215 op.create(true);
216 cls_queue_init(op, queue_name, queue_size);
217 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
218
219 const auto max_elements = 50;
220 const std::string marker;
221 bool truncated;
222 std::string next_marker;
223 std::vector<cls_queue_entry> entries;
224 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, next_marker);
225 ASSERT_EQ(0, ret);
226 ASSERT_EQ(truncated, false);
227 ASSERT_EQ(entries.size(), 0);
228 }
229
230 TEST_F(TestClsQueue, DequeueEmpty)
231 {
232 const std::string queue_name = "my-queue";
233 const uint64_t queue_size = 1024*1024;
234 librados::ObjectWriteOperation op;
235 op.create(true);
236 cls_queue_init(op, queue_name, queue_size);
237 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
238
239 const auto max_elements = 50;
240 const std::string marker;
241 bool truncated;
242 std::string end_marker;
243 std::vector<cls_queue_entry> entries;
244 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
245 ASSERT_EQ(0, ret);
246 cls_queue_remove_entries(op, end_marker);
247 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
248 }
249
250 TEST_F(TestClsQueue, ListAll)
251 {
252 const std::string queue_name = "my-queue";
253 const uint64_t queue_size = 1024*1024;
254 librados::ObjectWriteOperation op;
255 op.create(true);
256 cls_queue_init(op, queue_name, queue_size);
257 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
258
259 // test multiple enqueues
260 test_enqueue(queue_name, 10, 100, 0);
261
262 const auto total_elements = 10*100;
263 std::string marker;
264 bool truncated;
265 std::string next_marker;
266 std::vector<cls_queue_entry> entries;
267 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, total_elements, entries, &truncated, next_marker);
268 ASSERT_EQ(0, ret);
269 ASSERT_EQ(entries.size(), total_elements);
270 ASSERT_EQ(truncated, false);
271 }
272
273 TEST_F(TestClsQueue, DeleteAll)
274 {
275 const std::string queue_name = "my-queue";
276 const uint64_t queue_size = 1024*1024;
277 librados::ObjectWriteOperation op;
278 op.create(true);
279 cls_queue_init(op, queue_name, queue_size);
280 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
281
282 // test multiple enqueues
283 test_enqueue(queue_name, 10, 100, 0);
284
285 const auto total_elements = 10*100;
286 const std::string marker;
287 bool truncated;
288 std::string end_marker;
289 std::vector<cls_queue_entry> entries;
290 auto ret = cls_queue_list_entries(ioctx, queue_name, marker, total_elements, entries, &truncated, end_marker);
291 ASSERT_EQ(0, ret);
292 cls_queue_remove_entries(op, end_marker);
293 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
294 // list again to make sure that queue is empty
295 ret = cls_queue_list_entries(ioctx, queue_name, marker, 10, entries, &truncated, end_marker);
296 ASSERT_EQ(0, ret);
297 ASSERT_EQ(truncated, false);
298 ASSERT_EQ(entries.size(), 0);
299 }
300
301 TEST_F(TestClsQueue, EnqueueDequeue)
302 {
303 const std::string queue_name = "my-queue";
304 const uint64_t queue_size = 1024*1024;
305 librados::ObjectWriteOperation op;
306 op.create(true);
307 cls_queue_init(op, queue_name, queue_size);
308 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
309
310 bool done = false;
311 const int number_of_ops = 10;
312 const int number_of_elements = 100;
313
314 std::thread producer([this, &queue_name, &done] {
315 test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
316 done = true;
317 });
318
319 auto consume_count = 0U;
320 std::thread consumer([this, &queue_name, &consume_count, &done] {
321 librados::ObjectWriteOperation op;
322 const auto max_elements = 42;
323 const std::string marker;
324 bool truncated = false;
325 std::string end_marker;
326 std::vector<cls_queue_entry> entries;
327 while (!done || truncated) {
328 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
329 ASSERT_EQ(0, ret);
330 consume_count += entries.size();
331 cls_queue_remove_entries(op, end_marker);
332 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
333 }
334 });
335
336 producer.join();
337 consumer.join();
338 ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
339 }
340
341 TEST_F(TestClsQueue, QueueFullDequeue)
342 {
343 const std::string queue_name = "my-queue";
344 const uint64_t queue_size = 4096;
345 librados::ObjectWriteOperation op;
346 op.create(true);
347 cls_queue_init(op, queue_name, queue_size);
348 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
349
350 bool done = false;
351 const auto number_of_ops = 100;
352 const auto number_of_elements = 50;
353
354 std::thread producer([this, &queue_name, &done] {
355 librados::ObjectWriteOperation op;
356 // test multiple enqueues
357 for (auto i = 0; i < number_of_ops; ++i) {
358 const std::string element_prefix("op-" +to_string(i) + "-element-");
359 std::vector<bufferlist> data(number_of_elements);
360 // create vector of buffer lists
361 std::generate(data.begin(), data.end(), [j = 0, &element_prefix] () mutable {
362 bufferlist bl;
363 bl.append(element_prefix + to_string(j++));
364 return bl;
365 });
366
367 // enqueue vector
368 cls_queue_enqueue(op, 0, data);
369 if (ioctx.operate(queue_name, &op) == -28) {
370 // queue is full - wait and retry
371 --i;
372 std::this_thread::sleep_for(std::chrono::milliseconds(100));
373 }
374 }
375 done = true;
376 });
377
378 auto consume_count = 0;
379 std::thread consumer([this, &queue_name, &consume_count, &done] {
380 librados::ObjectWriteOperation op;
381 const auto max_elements = 42;
382 std::string marker;
383 bool truncated = false;
384 std::string end_marker;
385 std::vector<cls_queue_entry> entries;
386 while (!done || truncated) {
387 auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
388 ASSERT_EQ(0, ret);
389 consume_count += entries.size();
390 cls_queue_remove_entries(op, end_marker);
391 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
392 }
393 });
394
395 producer.join();
396 consumer.join();
397 ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
398 }
399
400 TEST_F(TestClsQueue, MultiProducer)
401 {
402 const std::string queue_name = "my-queue";
403 const uint64_t queue_size = 1024*1024;
404 librados::ObjectWriteOperation op;
405 op.create(true);
406 cls_queue_init(op, queue_name, queue_size);
407 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
408
409 const int max_producer_count = 10;
410 int producer_count = max_producer_count;
411 const int number_of_ops = 10;
412 const int number_of_elements = 100;
413
414 std::vector<std::thread> producers(max_producer_count);
415 for (auto& p : producers) {
416 p = std::thread([this, &queue_name, &producer_count] {
417 test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
418 --producer_count;
419 });
420 }
421
422 auto consume_count = 0U;
423 std::thread consumer([this, &queue_name, &consume_count, &producer_count] {
424 librados::ObjectWriteOperation op;
425 const auto max_elements = 42;
426 const std::string marker;
427 bool truncated = false;
428 std::string end_marker;
429 std::vector<cls_queue_entry> entries;
430 while (producer_count > 0 || truncated) {
431 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
432 ASSERT_EQ(0, ret);
433 consume_count += entries.size();
434 cls_queue_remove_entries(op, end_marker);
435 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
436 }
437 });
438
439 for (auto& p : producers) {
440 p.join();
441 }
442 consumer.join();
443 ASSERT_EQ(consume_count, number_of_ops*number_of_elements*max_producer_count);
444 }
445
446 TEST_F(TestClsQueue, MultiConsumer)
447 {
448 const std::string queue_name = "my-queue";
449 const uint64_t queue_size = 1024*1024;
450 librados::ObjectWriteOperation op;
451 op.create(true);
452 cls_queue_init(op, queue_name, queue_size);
453 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
454
455 bool done = false;
456 const int number_of_ops = 10;
457 const int number_of_elements = 100;
458
459 std::thread producer([this, &queue_name, &done] {
460 test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
461 done = true;
462 });
463
464 int consume_count = 0;
465 std::mutex list_and_remove_lock;
466
467 std::vector<std::thread> consumers(10);
468 for (auto& c : consumers) {
469 c = std::thread([this, &queue_name, &consume_count, &done, &list_and_remove_lock] {
470 librados::ObjectWriteOperation op;
471 const auto max_elements = 42;
472 const std::string marker;
473 bool truncated = false;
474 std::string end_marker;
475 std::vector<cls_queue_entry> entries;
476 while (!done || truncated) {
477 std::lock_guard lock(list_and_remove_lock);
478 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
479 ASSERT_EQ(0, ret);
480 consume_count += entries.size();
481 cls_queue_remove_entries(op, end_marker);
482 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
483 }
484 });
485 }
486
487 producer.join();
488 for (auto& c : consumers) {
489 c.join();
490 }
491 ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
492 }
493
494 TEST_F(TestClsQueue, NoLockMultiConsumer)
495 {
496 const std::string queue_name = "my-queue";
497 const uint64_t queue_size = 1024*1024;
498 librados::ObjectWriteOperation op;
499 op.create(true);
500 cls_queue_init(op, queue_name, queue_size);
501 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
502
503 bool done = false;
504 const int number_of_ops = 10;
505 const int number_of_elements = 100;
506
507 std::thread producer([this, &queue_name, &done] {
508 test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
509 done = true;
510 });
511
512 std::vector<std::thread> consumers(5);
513 for (auto& c : consumers) {
514 c = std::thread([this, &queue_name, &done] {
515 librados::ObjectWriteOperation op;
516 const auto max_elements = 42;
517 const std::string marker;
518 bool truncated = false;
519 std::string end_marker;
520 std::vector<cls_queue_entry> entries;
521 while (!done || truncated) {
522 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
523 ASSERT_EQ(0, ret);
524 cls_queue_remove_entries(op, end_marker);
525 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
526 }
527 });
528 }
529
530 producer.join();
531 for (auto& c : consumers) {
532 c.join();
533 }
534
535 // make sure queue is empty
536 const auto max_elements = 1000;
537 const std::string marker;
538 bool truncated = false;
539 std::string end_marker;
540 std::vector<cls_queue_entry> entries;
541 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
542 ASSERT_EQ(0, ret);
543 ASSERT_EQ(entries.size(), 0);
544 ASSERT_EQ(truncated, false);
545 }
546
547 TEST_F(TestClsQueue, WrapAround)
548 {
549 const std::string queue_name = "my-queue";
550 const auto number_of_entries = 10U;
551 const auto max_entry_size = 2000;
552 const auto min_entry_size = 1000;
553 const uint64_t queue_size = number_of_entries*max_entry_size;
554 const auto entry_overhead = 10;
555 librados::ObjectWriteOperation op;
556 op.create(true);
557 cls_queue_init(op, queue_name, queue_size);
558 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
559
560 std::list<bufferlist> total_bl;
561
562 // fill up the queue
563 for (auto i = 0U; i < number_of_entries; ++i) {
564 const auto entry_size = rand()%(max_entry_size - min_entry_size + 1) + min_entry_size;
565 std::string entry_str(entry_size-entry_overhead, 0);
566 std::generate_n(entry_str.begin(), entry_str.size(), [](){return (char)(rand());});
567 bufferlist entry_bl;
568 entry_bl.append(entry_str);
569 std::vector<bufferlist> data{{entry_bl}};
570 // enqueue vector
571 cls_queue_enqueue(op, 0, data);
572 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
573 total_bl.push_back(entry_bl);
574 }
575
576 std::string marker;
577 for (auto j = 0; j < 10; ++j) {
578 // empty half+1 of the queue
579 const auto max_elements = number_of_entries/2 + 1;
580 bool truncated;
581 std::string end_marker;
582 std::vector<cls_queue_entry> entries;
583 const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
584 ASSERT_EQ(0, ret);
585 for (auto& entry : entries) {
586 ASSERT_EQ(entry.data, total_bl.front());
587 total_bl.pop_front();
588 }
589 marker = end_marker;
590 cls_queue_remove_entries(op, end_marker);
591 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
592
593 // fill half+1 of the queue
594 for (auto i = 0U; i < number_of_entries/2 + 1; ++i) {
595 const auto entry_size = rand()%(max_entry_size - min_entry_size + 1) + min_entry_size;
596 std::string entry_str(entry_size-entry_overhead, 0);
597 std::generate_n(entry_str.begin(), entry_str.size(), [](){return (char)(rand());});
598 bufferlist entry_bl;
599 entry_bl.append(entry_str);
600 std::vector<bufferlist> data{{entry_bl}};
601 cls_queue_enqueue(op, 0, data);
602 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
603 total_bl.push_back(entry_bl);
604 }
605 }
606 }
607