1 // -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/types.h"
6 #include "cls/2pc_queue/cls_2pc_queue_types.h"
7 #include "cls/2pc_queue/cls_2pc_queue_client.h"
8 #include "cls/queue/cls_queue_client.h"
9 #include "cls/2pc_queue/cls_2pc_queue_types.h"
11 #include "gtest/gtest.h"
12 #include "test/librados/test_cxx.h"
13 #include "global/global_context.h"
24 class TestCls2PCQueue
: public ::testing::Test
{
26 librados::Rados rados
;
27 std::string pool_name
;
28 librados::IoCtx ioctx
;
30 void SetUp() override
{
31 pool_name
= get_temp_pool_name();
32 ASSERT_EQ("", create_one_pool_pp(pool_name
, rados
));
33 ASSERT_EQ(0, rados
.ioctx_create(pool_name
.c_str(), ioctx
));
36 void TearDown() override
{
38 ASSERT_EQ(0, destroy_one_pool_pp(pool_name
, rados
));
42 TEST_F(TestCls2PCQueue
, GetCapacity
)
44 const std::string queue_name
= __PRETTY_FUNCTION__
;
45 const auto max_size
= 8*1024;
46 librados::ObjectWriteOperation op
;
48 cls_2pc_queue_init(op
, queue_name
, max_size
);
49 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
53 const int ret
= cls_queue_get_capacity(ioctx
, queue_name
, size
);
55 ASSERT_EQ(max_size
, size
);
58 TEST_F(TestCls2PCQueue
, AsyncGetCapacity
)
60 const std::string queue_name
= __PRETTY_FUNCTION__
;
61 const auto max_size
= 8*1024;
62 librados::ObjectWriteOperation wop
;
64 cls_2pc_queue_init(wop
, queue_name
, max_size
);
65 ASSERT_EQ(0, ioctx
.operate(queue_name
, &wop
));
67 librados::ObjectReadOperation rop
;
70 cls_2pc_queue_get_capacity(rop
, &bl
, &rc
);
71 ASSERT_EQ(0, ioctx
.operate(queue_name
, &rop
, nullptr));
74 ASSERT_EQ(cls_2pc_queue_get_capacity_result(bl
, size
), 0);
75 ASSERT_EQ(max_size
, size
);
78 TEST_F(TestCls2PCQueue
, Reserve
)
80 const std::string queue_name
= __PRETTY_FUNCTION__
;
81 const auto max_size
= 1024U*1024U;
82 const auto number_of_ops
= 10U;
83 const auto number_of_elements
= 23U;
84 const auto size_to_reserve
= 250U;
85 librados::ObjectWriteOperation op
;
87 cls_2pc_queue_init(op
, queue_name
, max_size
);
88 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
90 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
91 cls_2pc_reservation::id_t res_id
;
92 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, size_to_reserve
, number_of_elements
, res_id
), 0);
93 ASSERT_EQ(res_id
, i
+1);
95 cls_2pc_reservations reservations
;
96 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
97 ASSERT_EQ(reservations
.size(), number_of_ops
);
98 for (const auto& r
: reservations
) {
99 ASSERT_NE(r
.first
, cls_2pc_reservation::NO_ID
);
100 ASSERT_GT(r
.second
.timestamp
.time_since_epoch().count(), 0);
104 TEST_F(TestCls2PCQueue
, AsyncReserve
)
106 const std::string queue_name
= __PRETTY_FUNCTION__
;
107 const auto max_size
= 1024U*1024U;
108 constexpr auto number_of_ops
= 10U;
109 constexpr auto number_of_elements
= 23U;
110 const auto size_to_reserve
= 250U;
111 librados::ObjectWriteOperation wop
;
113 cls_2pc_queue_init(wop
, queue_name
, max_size
);
114 ASSERT_EQ(0, ioctx
.operate(queue_name
, &wop
));
116 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
119 cls_2pc_queue_reserve(wop
, size_to_reserve
, number_of_elements
, &res_bl
, &res_rc
);
120 ASSERT_EQ(0, ioctx
.operate(queue_name
, &wop
, librados::OPERATION_RETURNVEC
));
121 ASSERT_EQ(res_rc
, 0);
122 cls_2pc_reservation::id_t res_id
;
123 ASSERT_EQ(0, cls_2pc_queue_reserve_result(res_bl
, res_id
));
124 ASSERT_EQ(res_id
, i
+1);
129 librados::ObjectReadOperation rop
;
130 cls_2pc_queue_list_reservations(rop
, &bl
, &rc
);
131 ASSERT_EQ(0, ioctx
.operate(queue_name
, &rop
, nullptr));
133 cls_2pc_reservations reservations
;
134 ASSERT_EQ(0, cls_2pc_queue_list_reservations_result(bl
, reservations
));
135 ASSERT_EQ(reservations
.size(), number_of_ops
);
136 for (const auto& r
: reservations
) {
137 ASSERT_NE(r
.first
, cls_2pc_reservation::NO_ID
);
138 ASSERT_GT(r
.second
.timestamp
.time_since_epoch().count(), 0);
142 TEST_F(TestCls2PCQueue
, Commit
)
144 const std::string queue_name
= __PRETTY_FUNCTION__
;
145 const auto max_size
= 1024*1024*128;
146 const auto number_of_ops
= 200U;
147 const auto number_of_elements
= 23U;
148 librados::ObjectWriteOperation op
;
150 cls_2pc_queue_init(op
, queue_name
, max_size
);
151 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
153 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
154 const std::string
element_prefix("op-" +to_string(i
) + "-element-");
155 auto total_size
= 0UL;
156 std::vector
<bufferlist
> data(number_of_elements
);
157 // create vector of buffer lists
158 std::generate(data
.begin(), data
.end(), [j
= 0, &element_prefix
, &total_size
] () mutable {
160 bl
.append(element_prefix
+ to_string(j
++));
161 total_size
+= bl
.length();
165 cls_2pc_reservation::id_t res_id
;
166 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, total_size
, number_of_elements
, res_id
), 0);
167 ASSERT_NE(res_id
, cls_2pc_reservation::NO_ID
);
168 cls_2pc_queue_commit(op
, data
, res_id
);
169 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
171 cls_2pc_reservations reservations
;
172 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
173 ASSERT_EQ(reservations
.size(), 0);
176 TEST_F(TestCls2PCQueue
, Abort
)
178 const std::string queue_name
= __PRETTY_FUNCTION__
;
179 const auto max_size
= 1024U*1024U;
180 const auto number_of_ops
= 17U;
181 const auto number_of_elements
= 23U;
182 const auto size_to_reserve
= 250U;
183 librados::ObjectWriteOperation op
;
185 cls_2pc_queue_init(op
, queue_name
, max_size
);
186 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
188 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
189 cls_2pc_reservation::id_t res_id
;
190 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, size_to_reserve
, number_of_elements
, res_id
), 0);
191 ASSERT_NE(res_id
, cls_2pc_reservation::NO_ID
);
192 cls_2pc_queue_abort(op
, res_id
);
193 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
195 cls_2pc_reservations reservations
;
196 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
197 ASSERT_EQ(reservations
.size(), 0);
200 TEST_F(TestCls2PCQueue
, ReserveError
)
202 const std::string queue_name
= __PRETTY_FUNCTION__
;
203 const auto max_size
= 256U*1024U;
204 const auto number_of_ops
= 254U;
205 const auto number_of_elements
= 1U;
206 const auto size_to_reserve
= 1024U;
207 librados::ObjectWriteOperation op
;
209 cls_2pc_queue_init(op
, queue_name
, max_size
);
210 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
212 cls_2pc_reservation::id_t res_id
;
213 for (auto i
= 0U; i
< number_of_ops
-1; ++i
) {
214 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, size_to_reserve
, number_of_elements
, res_id
), 0);
215 ASSERT_NE(res_id
, cls_2pc_reservation::NO_ID
);
217 res_id
= cls_2pc_reservation::NO_ID
;
218 // this one is failing because it exceeds the queue size
219 ASSERT_NE(cls_2pc_queue_reserve(ioctx
, queue_name
, size_to_reserve
, number_of_elements
, res_id
), 0);
220 ASSERT_EQ(res_id
, cls_2pc_reservation::NO_ID
);
222 // this one is failing because it tries to reserve 0 entries
223 ASSERT_NE(cls_2pc_queue_reserve(ioctx
, queue_name
, size_to_reserve
, 0, res_id
), 0);
224 // this one is failing because it tries to reserve 0 bytes
225 ASSERT_NE(cls_2pc_queue_reserve(ioctx
, queue_name
, 0, number_of_elements
, res_id
), 0);
227 cls_2pc_reservations reservations
;
228 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
229 ASSERT_EQ(reservations
.size(), number_of_ops
-1);
230 for (const auto& r
: reservations
) {
231 ASSERT_NE(r
.first
, cls_2pc_reservation::NO_ID
);
232 ASSERT_GT(r
.second
.timestamp
.time_since_epoch().count(), 0);
236 TEST_F(TestCls2PCQueue
, CommitError
)
238 const std::string queue_name
= __PRETTY_FUNCTION__
;
239 const auto max_size
= 1024*1024;
240 const auto number_of_ops
= 17U;
241 const auto number_of_elements
= 23U;
242 librados::ObjectWriteOperation op
;
244 cls_2pc_queue_init(op
, queue_name
, max_size
);
245 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
247 const auto invalid_reservation_op
= 8;
248 const auto invalid_elements_op
= 11;
249 std::vector
<bufferlist
> invalid_data(number_of_elements
+3);
250 // create vector of buffer lists
251 std::generate(invalid_data
.begin(), invalid_data
.end(), [j
= 0] () mutable {
253 bl
.append("invalid data is larger that regular data" + to_string(j
++));
256 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
257 const std::string
element_prefix("op-" +to_string(i
) + "-element-");
258 std::vector
<bufferlist
> data(number_of_elements
);
259 auto total_size
= 0UL;
260 // create vector of buffer lists
261 std::generate(data
.begin(), data
.end(), [j
= 0, &element_prefix
, &total_size
] () mutable {
263 bl
.append(element_prefix
+ to_string(j
++));
264 total_size
+= bl
.length();
268 cls_2pc_reservation::id_t res_id
;
269 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, total_size
, number_of_elements
, res_id
), 0);
270 ASSERT_NE(res_id
, cls_2pc_reservation::NO_ID
);
271 if (i
== invalid_reservation_op
) {
272 // fail on a commits with invalid reservation id
273 cls_2pc_queue_commit(op
, data
, res_id
+999);
274 ASSERT_NE(0, ioctx
.operate(queue_name
, &op
));
275 } else if (i
== invalid_elements_op
) {
276 // fail on a commits when data size is larger than the reserved one
277 cls_2pc_queue_commit(op
, invalid_data
, res_id
);
278 ASSERT_NE(0, ioctx
.operate(queue_name
, &op
));
280 cls_2pc_queue_commit(op
, data
, res_id
);
281 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
284 cls_2pc_reservations reservations
;
285 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
286 // 2 reservations were not comitted
287 ASSERT_EQ(reservations
.size(), 2);
290 TEST_F(TestCls2PCQueue
, AbortError
)
292 const std::string queue_name
= __PRETTY_FUNCTION__
;
293 const auto max_size
= 1024*1024;
294 const auto number_of_ops
= 17U;
295 const auto number_of_elements
= 23U;
296 const auto size_to_reserve
= 250U;
297 librados::ObjectWriteOperation op
;
299 cls_2pc_queue_init(op
, queue_name
, max_size
);
300 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
302 const auto invalid_reservation_op
= 8;
304 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
305 cls_2pc_reservation::id_t res_id
;
306 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, size_to_reserve
, number_of_elements
, res_id
), 0);
307 ASSERT_NE(res_id
, cls_2pc_reservation::NO_ID
);
308 if (i
== invalid_reservation_op
) {
309 // aborting a reservation which does not exists
310 // is a no-op, not an error
311 cls_2pc_queue_abort(op
, res_id
+999);
313 cls_2pc_queue_abort(op
, res_id
);
315 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
317 cls_2pc_reservations reservations
;
318 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
319 // 1 reservation was not aborted
320 ASSERT_EQ(reservations
.size(), 1);
323 TEST_F(TestCls2PCQueue
, MultiReserve
)
325 const std::string queue_name
= __PRETTY_FUNCTION__
;
326 const auto max_size
= 1024*1024;
327 const auto number_of_ops
= 11U;
328 const auto number_of_elements
= 23U;
329 const auto max_producer_count
= 10U;
330 const auto size_to_reserve
= 250U;
331 librados::ObjectWriteOperation op
;
333 cls_2pc_queue_init(op
, queue_name
, max_size
);
334 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
336 std::vector
<std::thread
> producers(max_producer_count
);
337 for (auto& p
: producers
) {
338 p
= std::thread([this, &queue_name
] {
339 librados::ObjectWriteOperation op
;
340 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
341 cls_2pc_reservation::id_t res_id
= cls_2pc_reservation::NO_ID
;
342 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, size_to_reserve
, number_of_elements
, res_id
), 0);
343 ASSERT_NE(res_id
, 0);
348 std::for_each(producers
.begin(), producers
.end(), [](auto& p
) { p
.join(); });
350 cls_2pc_reservations reservations
;
351 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
352 ASSERT_EQ(reservations
.size(), number_of_ops
*max_producer_count
);
353 auto total_reservations
= 0U;
354 for (const auto& r
: reservations
) {
355 total_reservations
+= r
.second
.size
;
357 ASSERT_EQ(total_reservations
, number_of_ops
*max_producer_count
*size_to_reserve
);
360 TEST_F(TestCls2PCQueue
, MultiCommit
)
362 const std::string queue_name
= __PRETTY_FUNCTION__
;
363 const auto max_size
= 1024*1024;
364 const auto number_of_ops
= 11U;
365 const auto number_of_elements
= 23U;
366 const auto max_producer_count
= 10U;
367 librados::ObjectWriteOperation op
;
369 cls_2pc_queue_init(op
, queue_name
, max_size
);
370 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
372 std::vector
<std::thread
> producers(max_producer_count
);
373 for (auto& p
: producers
) {
374 p
= std::thread([this, &queue_name
] {
375 librados::ObjectWriteOperation op
;
376 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
377 const std::string
element_prefix("op-" +to_string(i
) + "-element-");
378 std::vector
<bufferlist
> data(number_of_elements
);
379 auto total_size
= 0UL;
380 // create vector of buffer lists
381 std::generate(data
.begin(), data
.end(), [j
= 0, &element_prefix
, &total_size
] () mutable {
383 bl
.append(element_prefix
+ to_string(j
++));
384 total_size
+= bl
.length();
387 cls_2pc_reservation::id_t res_id
= cls_2pc_reservation::NO_ID
;
388 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, total_size
, number_of_elements
, res_id
), 0);
389 ASSERT_NE(res_id
, 0);
390 cls_2pc_queue_commit(op
, data
, res_id
);
391 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
396 std::for_each(producers
.begin(), producers
.end(), [](auto& p
) { p
.join(); });
398 cls_2pc_reservations reservations
;
399 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
400 ASSERT_EQ(reservations
.size(), 0);
403 TEST_F(TestCls2PCQueue
, MultiAbort
)
405 const std::string queue_name
= __PRETTY_FUNCTION__
;
406 const auto max_size
= 1024*1024;
407 const auto number_of_ops
= 11U;
408 const auto number_of_elements
= 23U;
409 const auto max_producer_count
= 10U;
410 const auto size_to_reserve
= 250U;
411 librados::ObjectWriteOperation op
;
413 cls_2pc_queue_init(op
, queue_name
, max_size
);
414 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
416 std::vector
<std::thread
> producers(max_producer_count
);
417 for (auto& p
: producers
) {
418 p
= std::thread([this, &queue_name
] {
419 librados::ObjectWriteOperation op
;
420 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
421 cls_2pc_reservation::id_t res_id
= cls_2pc_reservation::NO_ID
;
422 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, size_to_reserve
, number_of_elements
, res_id
), 0);
423 ASSERT_NE(res_id
, 0);
424 cls_2pc_queue_abort(op
, res_id
);
425 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
430 std::for_each(producers
.begin(), producers
.end(), [](auto& p
) { p
.join(); });
432 cls_2pc_reservations reservations
;
433 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
434 ASSERT_EQ(reservations
.size(), 0);
437 TEST_F(TestCls2PCQueue
, ReserveCommit
)
439 const std::string queue_name
= __PRETTY_FUNCTION__
;
440 const auto max_size
= 1024*1024;
441 const auto number_of_ops
= 11U;
442 const auto number_of_elements
= 23U;
443 const auto max_workers
= 10U;
444 const auto size_to_reserve
= 512U;
445 librados::ObjectWriteOperation op
;
447 cls_2pc_queue_init(op
, queue_name
, max_size
);
448 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
450 std::vector
<std::thread
> reservers(max_workers
);
451 for (auto& r
: reservers
) {
452 r
= std::thread([this, &queue_name
] {
453 librados::ObjectWriteOperation op
;
454 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
455 cls_2pc_reservation::id_t res_id
= cls_2pc_reservation::NO_ID
;
456 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, size_to_reserve
, number_of_elements
, res_id
), 0);
457 ASSERT_NE(res_id
, cls_2pc_reservation::NO_ID
);
462 auto committer
= std::thread([this, &queue_name
] {
463 librados::ObjectWriteOperation op
;
464 int remaining_ops
= number_of_ops
*max_workers
;
465 while (remaining_ops
> 0) {
466 const std::string
element_prefix("op-" +to_string(remaining_ops
) + "-element-");
467 std::vector
<bufferlist
> data(number_of_elements
);
468 // create vector of buffer lists
469 std::generate(data
.begin(), data
.end(), [j
= 0, &element_prefix
] () mutable {
471 bl
.append(element_prefix
+ to_string(j
++));
474 cls_2pc_reservations reservations
;
475 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
476 for (const auto& r
: reservations
) {
477 cls_2pc_queue_commit(op
, data
, r
.first
);
478 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
484 std::for_each(reservers
.begin(), reservers
.end(), [](auto& r
) { r
.join(); });
487 cls_2pc_reservations reservations
;
488 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
489 ASSERT_EQ(reservations
.size(), 0);
492 TEST_F(TestCls2PCQueue
, ReserveAbort
)
494 const std::string queue_name
= __PRETTY_FUNCTION__
;
495 const auto max_size
= 1024*1024;
496 const auto number_of_ops
= 17U;
497 const auto number_of_elements
= 23U;
498 const auto max_workers
= 10U;
499 const auto size_to_reserve
= 250U;
500 librados::ObjectWriteOperation op
;
502 cls_2pc_queue_init(op
, queue_name
, max_size
);
503 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
505 std::vector
<std::thread
> reservers(max_workers
);
506 for (auto& r
: reservers
) {
507 r
= std::thread([this, &queue_name
] {
508 librados::ObjectWriteOperation op
;
509 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
510 cls_2pc_reservation::id_t res_id
= cls_2pc_reservation::NO_ID
;
511 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, size_to_reserve
, number_of_elements
, res_id
), 0);
512 ASSERT_NE(res_id
, cls_2pc_reservation::NO_ID
);
517 auto aborter
= std::thread([this, &queue_name
] {
518 librados::ObjectWriteOperation op
;
519 int remaining_ops
= number_of_ops
*max_workers
;
520 while (remaining_ops
> 0) {
521 cls_2pc_reservations reservations
;
522 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
523 for (const auto& r
: reservations
) {
524 cls_2pc_queue_abort(op
, r
.first
);
525 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
531 std::for_each(reservers
.begin(), reservers
.end(), [](auto& r
) { r
.join(); });
534 cls_2pc_reservations reservations
;
535 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
536 ASSERT_EQ(reservations
.size(), 0);
539 TEST_F(TestCls2PCQueue
, ManualCleanup
)
541 const std::string queue_name
= __PRETTY_FUNCTION__
;
542 const auto max_size
= 128*1024*1024;
543 const auto number_of_ops
= 17U;
544 const auto number_of_elements
= 23U;
545 const auto max_workers
= 10U;
546 const auto size_to_reserve
= 512U;
547 librados::ObjectWriteOperation op
;
549 cls_2pc_queue_init(op
, queue_name
, max_size
);
550 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
552 // anything older than 100ms is cosidered stale
553 ceph::coarse_real_time stale_time
= ceph::coarse_real_clock::now() + std::chrono::milliseconds(100);
555 std::vector
<std::thread
> reservers(max_workers
);
556 for (auto& r
: reservers
) {
557 r
= std::thread([this, &queue_name
] {
558 librados::ObjectWriteOperation op
;
559 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
560 cls_2pc_reservation::id_t res_id
= cls_2pc_reservation::NO_ID
;
561 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, size_to_reserve
, number_of_elements
, res_id
), 0);
562 ASSERT_NE(res_id
, cls_2pc_reservation::NO_ID
);
563 // wait for 10ms between each reservation to make sure at least some are stale
564 std::this_thread::sleep_for(std::chrono::milliseconds(10));
569 auto cleaned_reservations
= 0U;
570 auto committed_reservations
= 0U;
571 auto aborter
= std::thread([this, &queue_name
, &stale_time
, &cleaned_reservations
, &committed_reservations
] {
572 librados::ObjectWriteOperation op
;
573 int remaining_ops
= number_of_ops
*max_workers
;
574 while (remaining_ops
> 0) {
575 cls_2pc_reservations reservations
;
576 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
577 for (const auto& r
: reservations
) {
578 if (r
.second
.timestamp
> stale_time
) {
579 // abort stale reservations
580 cls_2pc_queue_abort(op
, r
.first
);
581 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
582 ++cleaned_reservations
;
584 // commit good reservations
585 const std::string
element_prefix("op-" +to_string(remaining_ops
) + "-element-");
586 std::vector
<bufferlist
> data(number_of_elements
);
587 // create vector of buffer lists
588 std::generate(data
.begin(), data
.end(), [j
= 0, &element_prefix
] () mutable {
590 bl
.append(element_prefix
+ to_string(j
++));
593 cls_2pc_queue_commit(op
, data
, r
.first
);
594 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
595 ++committed_reservations
;
603 std::for_each(reservers
.begin(), reservers
.end(), [](auto& r
) { r
.join(); });
606 ASSERT_GT(cleaned_reservations
, 0);
607 ASSERT_EQ(committed_reservations
+ cleaned_reservations
, number_of_ops
*max_workers
);
608 cls_2pc_reservations reservations
;
609 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
610 ASSERT_EQ(reservations
.size(), 0);
613 TEST_F(TestCls2PCQueue
, Cleanup
)
615 const std::string queue_name
= __PRETTY_FUNCTION__
;
616 const auto max_size
= 128*1024*1024;
617 const auto number_of_ops
= 15U;
618 const auto number_of_elements
= 23U;
619 const auto max_workers
= 10U;
620 const auto size_to_reserve
= 512U;
621 librados::ObjectWriteOperation op
;
623 cls_2pc_queue_init(op
, queue_name
, max_size
);
624 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
626 // anything older than 100ms is cosidered stale
627 ceph::coarse_real_time stale_time
= ceph::coarse_real_clock::now() + std::chrono::milliseconds(100);
629 std::vector
<std::thread
> reservers(max_workers
);
630 for (auto& r
: reservers
) {
631 r
= std::thread([this, &queue_name
] {
632 librados::ObjectWriteOperation op
;
633 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
634 cls_2pc_reservation::id_t res_id
= cls_2pc_reservation::NO_ID
;
635 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, size_to_reserve
, number_of_elements
, res_id
), 0);
636 ASSERT_NE(res_id
, cls_2pc_reservation::NO_ID
);
637 // wait for 10ms between each reservation to make sure at least some are stale
638 std::this_thread::sleep_for(std::chrono::milliseconds(10));
643 std::for_each(reservers
.begin(), reservers
.end(), [](auto& r
) { r
.join(); });
645 cls_2pc_reservations all_reservations
;
646 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, all_reservations
));
647 ASSERT_EQ(all_reservations
.size(), number_of_ops
*max_workers
);
649 cls_2pc_queue_expire_reservations(op
, stale_time
);
650 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
652 cls_2pc_reservations good_reservations
;
653 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, good_reservations
));
655 for (const auto& r
: all_reservations
) {
656 if (good_reservations
.find(r
.first
) == good_reservations
.end()) {
657 // not in the "good" list
658 ASSERT_GE(stale_time
.time_since_epoch().count(),
659 r
.second
.timestamp
.time_since_epoch().count());
662 for (const auto& r
: good_reservations
) {
663 ASSERT_LT(stale_time
.time_since_epoch().count(),
664 r
.second
.timestamp
.time_since_epoch().count());
668 TEST_F(TestCls2PCQueue
, MultiProducer
)
670 const std::string queue_name
= __PRETTY_FUNCTION__
;
671 const auto max_size
= 128*1024*1024;
672 const auto number_of_ops
= 300U;
673 const auto number_of_elements
= 23U;
674 const auto max_producer_count
= 10U;
675 librados::ObjectWriteOperation op
;
677 cls_2pc_queue_init(op
, queue_name
, max_size
);
678 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
680 auto producer_count
= max_producer_count
;
682 std::vector
<std::thread
> producers(max_producer_count
);
683 for (auto& p
: producers
) {
684 p
= std::thread([this, &queue_name
, &producer_count
] {
685 librados::ObjectWriteOperation op
;
686 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
687 const std::string
element_prefix("op-" +to_string(i
) + "-element-");
688 std::vector
<bufferlist
> data(number_of_elements
);
689 auto total_size
= 0UL;
690 // create vector of buffer lists
691 std::generate(data
.begin(), data
.end(), [j
= 0, &element_prefix
, &total_size
] () mutable {
693 bl
.append(element_prefix
+ to_string(j
++));
694 total_size
+= bl
.length();
697 cls_2pc_reservation::id_t res_id
= cls_2pc_reservation::NO_ID
;
698 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, total_size
, number_of_elements
, res_id
), 0);
699 ASSERT_NE(res_id
, 0);
700 cls_2pc_queue_commit(op
, data
, res_id
);
701 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
707 auto consume_count
= 0U;
708 std::thread
consumer([this, &queue_name
, &consume_count
, &producer_count
] {
709 librados::ObjectWriteOperation op
;
710 const auto max_elements
= 42;
711 const std::string marker
;
712 bool truncated
= false;
713 std::string end_marker
;
714 std::vector
<cls_queue_entry
> entries
;
715 while (producer_count
> 0 || truncated
) {
716 const auto ret
= cls_2pc_queue_list_entries(ioctx
, queue_name
, marker
, max_elements
, entries
, &truncated
, end_marker
);
718 consume_count
+= entries
.size();
719 cls_2pc_queue_remove_entries(op
, end_marker
);
720 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
724 std::for_each(producers
.begin(), producers
.end(), [](auto& p
) { p
.join(); });
726 ASSERT_EQ(consume_count
, number_of_ops
*number_of_elements
*max_producer_count
);
729 TEST_F(TestCls2PCQueue
, AsyncConsumer
)
731 const std::string queue_name
= __PRETTY_FUNCTION__
;
732 constexpr auto max_size
= 128*1024*1024;
733 constexpr auto number_of_ops
= 250U;
734 constexpr auto number_of_elements
= 23U;
735 librados::ObjectWriteOperation wop
;
737 cls_2pc_queue_init(wop
, queue_name
, max_size
);
738 ASSERT_EQ(0, ioctx
.operate(queue_name
, &wop
));
741 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
742 const std::string
element_prefix("op-" +to_string(i
) + "-element-");
743 std::vector
<bufferlist
> data(number_of_elements
);
744 auto total_size
= 0UL;
745 // create vector of buffer lists
746 std::generate(data
.begin(), data
.end(), [j
= 0, &element_prefix
, &total_size
] () mutable {
748 bl
.append(element_prefix
+ to_string(j
++));
749 total_size
+= bl
.length();
752 cls_2pc_reservation::id_t res_id
= cls_2pc_reservation::NO_ID
;
753 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, total_size
, number_of_elements
, res_id
), 0);
754 ASSERT_NE(res_id
, 0);
755 cls_2pc_queue_commit(wop
, data
, res_id
);
756 ASSERT_EQ(0, ioctx
.operate(queue_name
, &wop
));
759 constexpr auto max_elements
= 42;
761 std::string end_marker
;
762 librados::ObjectReadOperation rop
;
763 auto consume_count
= 0U;
764 std::vector
<cls_queue_entry
> entries
;
765 bool truncated
= true;
769 cls_2pc_queue_list_entries(rop
, marker
, max_elements
, &bl
, &rc
);
770 ASSERT_EQ(0, ioctx
.operate(queue_name
, &rop
, nullptr));
772 ASSERT_EQ(cls_2pc_queue_list_entries_result(bl
, entries
, &truncated
, end_marker
), 0);
773 consume_count
+= entries
.size();
774 cls_2pc_queue_remove_entries(wop
, end_marker
);
778 ASSERT_EQ(consume_count
, number_of_ops
*number_of_elements
);
779 // execute all delete operations in a batch
780 ASSERT_EQ(0, ioctx
.operate(queue_name
, &wop
));
781 // make sure that queue is empty
782 ASSERT_EQ(cls_2pc_queue_list_entries(ioctx
, queue_name
, marker
, max_elements
, entries
, &truncated
, end_marker
), 0);
783 ASSERT_EQ(entries
.size(), 0);
786 TEST_F(TestCls2PCQueue
, MultiProducerConsumer
)
788 const std::string queue_name
= __PRETTY_FUNCTION__
;
789 const auto max_size
= 1024*1024;
790 const auto number_of_ops
= 300U;
791 const auto number_of_elements
= 23U;
792 const auto max_workers
= 10U;
793 librados::ObjectWriteOperation op
;
795 cls_2pc_queue_init(op
, queue_name
, max_size
);
796 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
798 auto producer_count
= max_workers
;
800 auto retry_happened
= false;
802 std::vector
<std::thread
> producers(max_workers
);
803 for (auto& p
: producers
) {
804 p
= std::thread([this, &queue_name
, &producer_count
, &retry_happened
] {
805 librados::ObjectWriteOperation op
;
806 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
807 const std::string
element_prefix("op-" +to_string(i
) + "-element-");
808 std::vector
<bufferlist
> data(number_of_elements
);
809 auto total_size
= 0UL;
810 // create vector of buffer lists
811 std::generate(data
.begin(), data
.end(), [j
= 0, &element_prefix
, &total_size
] () mutable {
813 bl
.append(element_prefix
+ to_string(j
++));
814 total_size
+= bl
.length();
817 cls_2pc_reservation::id_t res_id
= cls_2pc_reservation::NO_ID
;
818 auto rc
= cls_2pc_queue_reserve(ioctx
, queue_name
, total_size
, number_of_elements
, res_id
);
820 // other errors should cause test to fail
821 ASSERT_EQ(rc
, -ENOSPC
);
822 ASSERT_EQ(res_id
, 0);
823 // queue is full, sleep and retry
824 retry_happened
= true;
825 std::this_thread::sleep_for(std::chrono::milliseconds(10));
826 rc
= cls_2pc_queue_reserve(ioctx
, queue_name
, total_size
, number_of_elements
, res_id
);
828 ASSERT_NE(res_id
, 0);
829 cls_2pc_queue_commit(op
, data
, res_id
);
830 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
836 const auto max_elements
= 128;
837 std::vector
<std::thread
> consumers(max_workers
/2);
838 for (auto& c
: consumers
) {
839 c
= std::thread([this, &queue_name
, &producer_count
] {
840 librados::ObjectWriteOperation op
;
841 const std::string marker
;
842 bool truncated
= false;
843 std::string end_marker
;
844 std::vector
<cls_queue_entry
> entries
;
845 while (producer_count
> 0 || truncated
) {
846 const auto ret
= cls_2pc_queue_list_entries(ioctx
, queue_name
, marker
, max_elements
, entries
, &truncated
, end_marker
);
848 if (entries
.empty()) {
849 // queue is empty, let it fill
850 std::this_thread::sleep_for(std::chrono::milliseconds(100));
852 cls_2pc_queue_remove_entries(op
, end_marker
);
853 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
859 std::for_each(producers
.begin(), producers
.end(), [](auto& p
) { p
.join(); });
860 std::for_each(consumers
.begin(), consumers
.end(), [](auto& c
) { c
.join(); });
861 if (!retry_happened
) {
862 std::cerr
<< "Queue was never full - all reservations were sucessfull." <<
863 "Please decrease the amount of consumer threads" << std::endl
;
865 // make sure that queue is empty and no reservations remain
866 cls_2pc_reservations reservations
;
867 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
868 ASSERT_EQ(reservations
.size(), 0);
869 const std::string marker
;
870 bool truncated
= false;
871 std::string end_marker
;
872 std::vector
<cls_queue_entry
> entries
;
873 ASSERT_EQ(0, cls_2pc_queue_list_entries(ioctx
, queue_name
, marker
, max_elements
, entries
, &truncated
, end_marker
));
874 ASSERT_EQ(entries
.size(), 0);
877 TEST_F(TestCls2PCQueue
, ReserveSpillover
)
879 const std::string queue_name
= __PRETTY_FUNCTION__
;
880 const auto max_size
= 1024U*1024U;
881 const auto number_of_ops
= 1024U;
882 const auto number_of_elements
= 8U;
883 const auto size_to_reserve
= 64U;
884 librados::ObjectWriteOperation op
;
886 cls_2pc_queue_init(op
, queue_name
, max_size
);
887 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
889 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
890 cls_2pc_reservation::id_t res_id
;
891 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, size_to_reserve
, number_of_elements
, res_id
), 0);
892 ASSERT_NE(res_id
, cls_2pc_reservation::NO_ID
);
894 cls_2pc_reservations reservations
;
895 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
896 ASSERT_EQ(reservations
.size(), number_of_ops
);
897 for (const auto& r
: reservations
) {
898 ASSERT_NE(r
.first
, cls_2pc_reservation::NO_ID
);
899 ASSERT_GT(r
.second
.timestamp
.time_since_epoch().count(), 0);
903 TEST_F(TestCls2PCQueue
, CommitSpillover
)
905 const std::string queue_name
= __PRETTY_FUNCTION__
;
906 const auto max_size
= 1024U*1024U;
907 const auto number_of_ops
= 1024U;
908 const auto number_of_elements
= 4U;
909 const auto size_to_reserve
= 128U;
910 librados::ObjectWriteOperation op
;
912 cls_2pc_queue_init(op
, queue_name
, max_size
);
913 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
915 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
916 cls_2pc_reservation::id_t res_id
;
917 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, size_to_reserve
, number_of_elements
, res_id
), 0);
918 ASSERT_NE(res_id
, cls_2pc_reservation::NO_ID
);
920 cls_2pc_reservations reservations
;
921 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
922 for (const auto& r
: reservations
) {
923 const std::string
element_prefix("foo");
924 std::vector
<bufferlist
> data(number_of_elements
);
925 auto total_size
= 0UL;
926 // create vector of buffer lists
927 std::generate(data
.begin(), data
.end(), [j
= 0, &element_prefix
, &total_size
] () mutable {
929 bl
.append(element_prefix
+ to_string(j
++));
930 total_size
+= bl
.length();
933 ASSERT_NE(r
.first
, cls_2pc_reservation::NO_ID
);
934 cls_2pc_queue_commit(op
, data
, r
.first
);
935 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
937 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
938 ASSERT_EQ(reservations
.size(), 0);
941 TEST_F(TestCls2PCQueue
, AbortSpillover
)
943 const std::string queue_name
= __PRETTY_FUNCTION__
;
944 const auto max_size
= 1024U*1024U;
945 const auto number_of_ops
= 1024U;
946 const auto number_of_elements
= 4U;
947 const auto size_to_reserve
= 128U;
948 librados::ObjectWriteOperation op
;
950 cls_2pc_queue_init(op
, queue_name
, max_size
);
951 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
953 for (auto i
= 0U; i
< number_of_ops
; ++i
) {
954 cls_2pc_reservation::id_t res_id
;
955 ASSERT_EQ(cls_2pc_queue_reserve(ioctx
, queue_name
, size_to_reserve
, number_of_elements
, res_id
), 0);
956 ASSERT_NE(res_id
, cls_2pc_reservation::NO_ID
);
958 cls_2pc_reservations reservations
;
959 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
960 for (const auto& r
: reservations
) {
961 ASSERT_NE(r
.first
, cls_2pc_reservation::NO_ID
);
962 cls_2pc_queue_abort(op
, r
.first
);
963 ASSERT_EQ(0, ioctx
.operate(queue_name
, &op
));
965 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx
, queue_name
, reservations
));
966 ASSERT_EQ(reservations
.size(), 0);