1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/types.h"
6 #include "cls/queue/cls_queue_types.h"
7 #include "cls/queue/cls_queue_client.h"
8 #include "cls/queue/cls_queue_ops.h"
10 #include "gtest/gtest.h"
11 #include "test/librados/test_cxx.h"
12 #include "global/global_context.h"
23 class TestClsQueue
: public ::testing::Test
{
25 librados::Rados rados
;
26 std::string pool_name
;
27 librados::IoCtx ioctx
;
29 void SetUp() override
{
30 pool_name
= get_temp_pool_name();
31 ASSERT_EQ("", create_one_pool_pp(pool_name
, rados
));
32 ASSERT_EQ(0, rados
.ioctx_create(pool_name
.c_str(), ioctx
));
35 void TearDown() override
{
37 ASSERT_EQ(0, destroy_one_pool_pp(pool_name
, rados
));
40 void test_enqueue(const std::string
& queue_name
,
42 int number_of_elements
,
44 librados::ObjectWriteOperation op
;
45 // test multiple enqueues
46 for (auto i
= 0; i
< number_of_ops
; ++i
) {
47 const std::string
element_prefix("op-" +to_string(i
) + "-element-");
48 std::vector
<bufferlist
> data(number_of_elements
);
49 // create vector of buffer lists
50 std::generate(data
.begin(), data
.end(), [j
= 0, &element_prefix
] () mutable {
52 bl
.append(element_prefix
+ to_string(j
++));
57 cls_queue_enqueue(op
, 0, data
);
58 ASSERT_EQ(expected_rc
, ioctx
.operate(queue_name
, &op
));
63 TEST_F(TestClsQueue
, GetCapacity
)
65 const std::string queue_name
= "my-queue";
66 const uint64_t queue_size
= 1024*1024;
67 librados::ObjectWriteOperation op
;
69 cls_queue_init(op
, queue_name
, queue_size
);
70 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
73 const int ret
= cls_queue_get_capacity(ioctx
, queue_name
, size
);
75 ASSERT_EQ(queue_size
, size
);
78 TEST_F(TestClsQueue
, Enqueue
)
80 const std::string queue_name
= "my-queue";
81 const uint64_t queue_size
= 1024*1024;
82 librados::ObjectWriteOperation op
;
84 cls_queue_init(op
, queue_name
, queue_size
);
85 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
87 // test multiple enqueues
88 // 10 iterations, 100 elelemts each
90 test_enqueue(queue_name
, 10, 100, 0);
93 TEST_F(TestClsQueue
, QueueFull
)
95 const std::string queue_name
= "my-queue";
96 const uint64_t queue_size
= 1024;
97 librados::ObjectWriteOperation op
;
99 cls_queue_init(op
, queue_name
, queue_size
);
100 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
102 // 8 iterations, 5 elelemts each
104 test_enqueue(queue_name
, 8, 5, 0);
105 // 2 iterations, 5 elelemts each
106 // expect -28 (Q FULL)
107 test_enqueue(queue_name
, 2, 5, -28);
110 TEST_F(TestClsQueue
, List
)
112 const std::string queue_name
= "my-queue";
113 const uint64_t queue_size
= 1024*1024;
114 librados::ObjectWriteOperation op
;
116 cls_queue_init(op
, queue_name
, queue_size
);
117 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
118 const auto number_of_ops
= 10;
119 const auto number_of_elements
= 100;
121 // test multiple enqueues
122 test_enqueue(queue_name
, number_of_ops
, number_of_elements
, 0);
124 const auto max_elements
= 42;
126 bool truncated
= false;
127 std::string next_marker
;
128 auto total_elements
= 0;
130 std::vector
<cls_queue_entry
> entries
;
131 const auto ret
= cls_queue_list_entries(ioctx
, queue_name
, marker
, max_elements
, entries
, &truncated
, next_marker
);
133 marker
= next_marker
;
134 total_elements
+= entries
.size();
137 ASSERT_EQ(total_elements
, number_of_ops
*number_of_elements
);
140 TEST_F(TestClsQueue
, Dequeue
)
142 const std::string queue_name
= "my-queue";
143 const uint64_t queue_size
= 1024*1024;
144 librados::ObjectWriteOperation op
;
146 cls_queue_init(op
, queue_name
, queue_size
);
147 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
149 // test multiple enqueues
150 test_enqueue(queue_name
, 10, 100, 0);
152 // get the end marker for 42 elements
153 const auto remove_elements
= 42;
154 const std::string marker
;
156 std::string end_marker
;
157 std::vector
<cls_queue_entry
> entries
;
158 const auto ret
= cls_queue_list_entries(ioctx
, queue_name
, marker
, remove_elements
, entries
, &truncated
, end_marker
);
160 ASSERT_EQ(truncated
, true);
161 // remove up to end marker
162 cls_queue_remove_entries(op
, end_marker
);
163 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
166 TEST_F(TestClsQueue
, DequeueMarker
)
168 const std::string queue_name
= "my-queue";
169 const uint64_t queue_size
= 1024*1024;
170 librados::ObjectWriteOperation op
;
172 cls_queue_init(op
, queue_name
, queue_size
);
173 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
175 // test multiple enqueues
176 test_enqueue(queue_name
, 10, 1000, 0);
178 const auto remove_elements
= 1024;
179 const std::string marker
;
181 std::string end_marker
;
182 std::vector
<cls_queue_entry
> entries
;
183 auto ret
= cls_queue_list_entries(ioctx
, queue_name
, marker
, remove_elements
, entries
, &truncated
, end_marker
);
185 ASSERT_EQ(truncated
, true);
186 cls_queue_marker after_deleted_marker
;
187 // remove specific markers
188 for (const auto& entry
: entries
) {
189 cls_queue_marker marker
;
190 marker
.from_str(entry
.marker
.c_str());
191 ASSERT_EQ(marker
.from_str(entry
.marker
.c_str()), 0);
192 if (marker
.offset
> 0 && marker
.offset
% 2 == 0) {
193 after_deleted_marker
= marker
;
194 cls_queue_remove_entries(op
, marker
.to_str());
197 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
199 ret
= cls_queue_list_entries(ioctx
, queue_name
, marker
, remove_elements
, entries
, &truncated
, end_marker
);
201 for (const auto& entry
: entries
) {
202 cls_queue_marker marker
;
203 marker
.from_str(entry
.marker
.c_str());
204 ASSERT_EQ(marker
.from_str(entry
.marker
.c_str()), 0);
205 ASSERT_GE(marker
.gen
, after_deleted_marker
.gen
);
206 ASSERT_GE(marker
.offset
, after_deleted_marker
.offset
);
210 TEST_F(TestClsQueue
, ListEmpty
)
212 const std::string queue_name
= "my-queue";
213 const uint64_t queue_size
= 1024*1024;
214 librados::ObjectWriteOperation op
;
216 cls_queue_init(op
, queue_name
, queue_size
);
217 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
219 const auto max_elements
= 50;
220 const std::string marker
;
222 std::string next_marker
;
223 std::vector
<cls_queue_entry
> entries
;
224 const auto ret
= cls_queue_list_entries(ioctx
, queue_name
, marker
, max_elements
, entries
, &truncated
, next_marker
);
226 ASSERT_EQ(truncated
, false);
227 ASSERT_EQ(entries
.size(), 0);
230 TEST_F(TestClsQueue
, DequeueEmpty
)
232 const std::string queue_name
= "my-queue";
233 const uint64_t queue_size
= 1024*1024;
234 librados::ObjectWriteOperation op
;
236 cls_queue_init(op
, queue_name
, queue_size
);
237 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
239 const auto max_elements
= 50;
240 const std::string marker
;
242 std::string end_marker
;
243 std::vector
<cls_queue_entry
> entries
;
244 const auto ret
= cls_queue_list_entries(ioctx
, queue_name
, marker
, max_elements
, entries
, &truncated
, end_marker
);
246 cls_queue_remove_entries(op
, end_marker
);
247 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
250 TEST_F(TestClsQueue
, ListAll
)
252 const std::string queue_name
= "my-queue";
253 const uint64_t queue_size
= 1024*1024;
254 librados::ObjectWriteOperation op
;
256 cls_queue_init(op
, queue_name
, queue_size
);
257 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
259 // test multiple enqueues
260 test_enqueue(queue_name
, 10, 100, 0);
262 const auto total_elements
= 10*100;
265 std::string next_marker
;
266 std::vector
<cls_queue_entry
> entries
;
267 const auto ret
= cls_queue_list_entries(ioctx
, queue_name
, marker
, total_elements
, entries
, &truncated
, next_marker
);
269 ASSERT_EQ(entries
.size(), total_elements
);
270 ASSERT_EQ(truncated
, false);
273 TEST_F(TestClsQueue
, DeleteAll
)
275 const std::string queue_name
= "my-queue";
276 const uint64_t queue_size
= 1024*1024;
277 librados::ObjectWriteOperation op
;
279 cls_queue_init(op
, queue_name
, queue_size
);
280 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
282 // test multiple enqueues
283 test_enqueue(queue_name
, 10, 100, 0);
285 const auto total_elements
= 10*100;
286 const std::string marker
;
288 std::string end_marker
;
289 std::vector
<cls_queue_entry
> entries
;
290 auto ret
= cls_queue_list_entries(ioctx
, queue_name
, marker
, total_elements
, entries
, &truncated
, end_marker
);
292 cls_queue_remove_entries(op
, end_marker
);
293 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
294 // list again to make sure that queue is empty
295 ret
= cls_queue_list_entries(ioctx
, queue_name
, marker
, 10, entries
, &truncated
, end_marker
);
297 ASSERT_EQ(truncated
, false);
298 ASSERT_EQ(entries
.size(), 0);
301 TEST_F(TestClsQueue
, EnqueueDequeue
)
303 const std::string queue_name
= "my-queue";
304 const uint64_t queue_size
= 1024*1024;
305 librados::ObjectWriteOperation op
;
307 cls_queue_init(op
, queue_name
, queue_size
);
308 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
311 const int number_of_ops
= 10;
312 const int number_of_elements
= 100;
314 std::thread
producer([this, &queue_name
, &done
] {
315 test_enqueue(queue_name
, number_of_ops
, number_of_elements
, 0);
319 auto consume_count
= 0U;
320 std::thread
consumer([this, &queue_name
, &consume_count
, &done
] {
321 librados::ObjectWriteOperation op
;
322 const auto max_elements
= 42;
323 const std::string marker
;
324 bool truncated
= false;
325 std::string end_marker
;
326 std::vector
<cls_queue_entry
> entries
;
327 while (!done
|| truncated
) {
328 const auto ret
= cls_queue_list_entries(ioctx
, queue_name
, marker
, max_elements
, entries
, &truncated
, end_marker
);
330 consume_count
+= entries
.size();
331 cls_queue_remove_entries(op
, end_marker
);
332 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
338 ASSERT_EQ(consume_count
, number_of_ops
*number_of_elements
);
341 TEST_F(TestClsQueue
, QueueFullDequeue
)
343 const std::string queue_name
= "my-queue";
344 const uint64_t queue_size
= 4096;
345 librados::ObjectWriteOperation op
;
347 cls_queue_init(op
, queue_name
, queue_size
);
348 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
351 const auto number_of_ops
= 100;
352 const auto number_of_elements
= 50;
354 std::thread
producer([this, &queue_name
, &done
] {
355 librados::ObjectWriteOperation op
;
356 // test multiple enqueues
357 for (auto i
= 0; i
< number_of_ops
; ++i
) {
358 const std::string
element_prefix("op-" +to_string(i
) + "-element-");
359 std::vector
<bufferlist
> data(number_of_elements
);
360 // create vector of buffer lists
361 std::generate(data
.begin(), data
.end(), [j
= 0, &element_prefix
] () mutable {
363 bl
.append(element_prefix
+ to_string(j
++));
368 cls_queue_enqueue(op
, 0, data
);
369 if (ioctx
.operate(queue_name
, &op
) == -28) {
370 // queue is full - wait and retry
372 std::this_thread::sleep_for(std::chrono::milliseconds(100));
378 auto consume_count
= 0;
379 std::thread
consumer([this, &queue_name
, &consume_count
, &done
] {
380 librados::ObjectWriteOperation op
;
381 const auto max_elements
= 42;
383 bool truncated
= false;
384 std::string end_marker
;
385 std::vector
<cls_queue_entry
> entries
;
386 while (!done
|| truncated
) {
387 auto ret
= cls_queue_list_entries(ioctx
, queue_name
, marker
, max_elements
, entries
, &truncated
, end_marker
);
389 consume_count
+= entries
.size();
390 cls_queue_remove_entries(op
, end_marker
);
391 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
397 ASSERT_EQ(consume_count
, number_of_ops
*number_of_elements
);
400 TEST_F(TestClsQueue
, MultiProducer
)
402 const std::string queue_name
= "my-queue";
403 const uint64_t queue_size
= 1024*1024;
404 librados::ObjectWriteOperation op
;
406 cls_queue_init(op
, queue_name
, queue_size
);
407 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
409 const int max_producer_count
= 10;
410 int producer_count
= max_producer_count
;
411 const int number_of_ops
= 10;
412 const int number_of_elements
= 100;
414 std::vector
<std::thread
> producers(max_producer_count
);
415 for (auto& p
: producers
) {
416 p
= std::thread([this, &queue_name
, &producer_count
] {
417 test_enqueue(queue_name
, number_of_ops
, number_of_elements
, 0);
422 auto consume_count
= 0U;
423 std::thread
consumer([this, &queue_name
, &consume_count
, &producer_count
] {
424 librados::ObjectWriteOperation op
;
425 const auto max_elements
= 42;
426 const std::string marker
;
427 bool truncated
= false;
428 std::string end_marker
;
429 std::vector
<cls_queue_entry
> entries
;
430 while (producer_count
> 0 || truncated
) {
431 const auto ret
= cls_queue_list_entries(ioctx
, queue_name
, marker
, max_elements
, entries
, &truncated
, end_marker
);
433 consume_count
+= entries
.size();
434 cls_queue_remove_entries(op
, end_marker
);
435 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
439 for (auto& p
: producers
) {
443 ASSERT_EQ(consume_count
, number_of_ops
*number_of_elements
*max_producer_count
);
446 TEST_F(TestClsQueue
, MultiConsumer
)
448 const std::string queue_name
= "my-queue";
449 const uint64_t queue_size
= 1024*1024;
450 librados::ObjectWriteOperation op
;
452 cls_queue_init(op
, queue_name
, queue_size
);
453 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
456 const int number_of_ops
= 10;
457 const int number_of_elements
= 100;
459 std::thread
producer([this, &queue_name
, &done
] {
460 test_enqueue(queue_name
, number_of_ops
, number_of_elements
, 0);
464 int consume_count
= 0;
465 std::mutex list_and_remove_lock
;
467 std::vector
<std::thread
> consumers(10);
468 for (auto& c
: consumers
) {
469 c
= std::thread([this, &queue_name
, &consume_count
, &done
, &list_and_remove_lock
] {
470 librados::ObjectWriteOperation op
;
471 const auto max_elements
= 42;
472 const std::string marker
;
473 bool truncated
= false;
474 std::string end_marker
;
475 std::vector
<cls_queue_entry
> entries
;
476 while (!done
|| truncated
) {
477 std::lock_guard
lock(list_and_remove_lock
);
478 const auto ret
= cls_queue_list_entries(ioctx
, queue_name
, marker
, max_elements
, entries
, &truncated
, end_marker
);
480 consume_count
+= entries
.size();
481 cls_queue_remove_entries(op
, end_marker
);
482 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
488 for (auto& c
: consumers
) {
491 ASSERT_EQ(consume_count
, number_of_ops
*number_of_elements
);
494 TEST_F(TestClsQueue
, NoLockMultiConsumer
)
496 const std::string queue_name
= "my-queue";
497 const uint64_t queue_size
= 1024*1024;
498 librados::ObjectWriteOperation op
;
500 cls_queue_init(op
, queue_name
, queue_size
);
501 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
504 const int number_of_ops
= 10;
505 const int number_of_elements
= 100;
507 std::thread
producer([this, &queue_name
, &done
] {
508 test_enqueue(queue_name
, number_of_ops
, number_of_elements
, 0);
512 std::vector
<std::thread
> consumers(5);
513 for (auto& c
: consumers
) {
514 c
= std::thread([this, &queue_name
, &done
] {
515 librados::ObjectWriteOperation op
;
516 const auto max_elements
= 42;
517 const std::string marker
;
518 bool truncated
= false;
519 std::string end_marker
;
520 std::vector
<cls_queue_entry
> entries
;
521 while (!done
|| truncated
) {
522 const auto ret
= cls_queue_list_entries(ioctx
, queue_name
, marker
, max_elements
, entries
, &truncated
, end_marker
);
524 cls_queue_remove_entries(op
, end_marker
);
525 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
531 for (auto& c
: consumers
) {
535 // make sure queue is empty
536 const auto max_elements
= 1000;
537 const std::string marker
;
538 bool truncated
= false;
539 std::string end_marker
;
540 std::vector
<cls_queue_entry
> entries
;
541 const auto ret
= cls_queue_list_entries(ioctx
, queue_name
, marker
, max_elements
, entries
, &truncated
, end_marker
);
543 ASSERT_EQ(entries
.size(), 0);
544 ASSERT_EQ(truncated
, false);
547 TEST_F(TestClsQueue
, WrapAround
)
549 const std::string queue_name
= "my-queue";
550 const auto number_of_entries
= 10U;
551 const auto max_entry_size
= 2000;
552 const auto min_entry_size
= 1000;
553 const uint64_t queue_size
= number_of_entries
*max_entry_size
;
554 const auto entry_overhead
= 10;
555 librados::ObjectWriteOperation op
;
557 cls_queue_init(op
, queue_name
, queue_size
);
558 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
560 std::list
<bufferlist
> total_bl
;
563 for (auto i
= 0U; i
< number_of_entries
; ++i
) {
564 const auto entry_size
= rand()%(max_entry_size
- min_entry_size
+ 1) + min_entry_size
;
565 std::string
entry_str(entry_size
-entry_overhead
, 0);
566 std::generate_n(entry_str
.begin(), entry_str
.size(), [](){return (char)(rand());});
568 entry_bl
.append(entry_str
);
569 std::vector
<bufferlist
> data
{{entry_bl
}};
571 cls_queue_enqueue(op
, 0, data
);
572 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
573 total_bl
.push_back(entry_bl
);
577 for (auto j
= 0; j
< 10; ++j
) {
578 // empty half+1 of the queue
579 const auto max_elements
= number_of_entries
/2 + 1;
581 std::string end_marker
;
582 std::vector
<cls_queue_entry
> entries
;
583 const auto ret
= cls_queue_list_entries(ioctx
, queue_name
, marker
, max_elements
, entries
, &truncated
, end_marker
);
585 for (auto& entry
: entries
) {
586 ASSERT_EQ(entry
.data
, total_bl
.front());
587 total_bl
.pop_front();
590 cls_queue_remove_entries(op
, end_marker
);
591 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
593 // fill half+1 of the queue
594 for (auto i
= 0U; i
< number_of_entries
/2 + 1; ++i
) {
595 const auto entry_size
= rand()%(max_entry_size
- min_entry_size
+ 1) + min_entry_size
;
596 std::string
entry_str(entry_size
-entry_overhead
, 0);
597 std::generate_n(entry_str
.begin(), entry_str
.size(), [](){return (char)(rand());});
599 entry_bl
.append(entry_str
);
600 std::vector
<bufferlist
> data
{{entry_bl
}};
601 cls_queue_enqueue(op
, 0, data
);
602 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
603 total_bl
.push_back(entry_bl
);