]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/cls_2pc_queue/test_cls_2pc_queue.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / cls_2pc_queue / test_cls_2pc_queue.cc
1 // -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/types.h"
5
6 #include "cls/2pc_queue/cls_2pc_queue_types.h"
7 #include "cls/2pc_queue/cls_2pc_queue_client.h"
8 #include "cls/queue/cls_queue_client.h"
9 #include "cls/2pc_queue/cls_2pc_queue_types.h"
10
11 #include "gtest/gtest.h"
12 #include "test/librados/test_cxx.h"
13 #include "global/global_context.h"
14
15 #include <string>
16 #include <vector>
17 #include <algorithm>
18 #include <thread>
19 #include <chrono>
20 #include <atomic>
21
22 using namespace std;
23
24 class TestCls2PCQueue : public ::testing::Test {
25 protected:
26 librados::Rados rados;
27 std::string pool_name;
28 librados::IoCtx ioctx;
29
30 void SetUp() override {
31 pool_name = get_temp_pool_name();
32 ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
33 ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
34 }
35
36 void TearDown() override {
37 ioctx.close();
38 ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
39 }
40 };
41
42 TEST_F(TestCls2PCQueue, GetCapacity)
43 {
44 const std::string queue_name = __PRETTY_FUNCTION__;
45 const auto max_size = 8*1024;
46 librados::ObjectWriteOperation op;
47 op.create(true);
48 cls_2pc_queue_init(op, queue_name, max_size);
49 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
50
51 uint64_t size;
52
53 const int ret = cls_queue_get_capacity(ioctx, queue_name, size);
54 ASSERT_EQ(0, ret);
55 ASSERT_EQ(max_size, size);
56 }
57
58 TEST_F(TestCls2PCQueue, AsyncGetCapacity)
59 {
60 const std::string queue_name = __PRETTY_FUNCTION__;
61 const auto max_size = 8*1024;
62 librados::ObjectWriteOperation wop;
63 wop.create(true);
64 cls_2pc_queue_init(wop, queue_name, max_size);
65 ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
66
67 librados::ObjectReadOperation rop;
68 bufferlist bl;
69 int rc;
70 cls_2pc_queue_get_capacity(rop, &bl, &rc);
71 ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr));
72 ASSERT_EQ(0, rc);
73 uint64_t size;
74 ASSERT_EQ(cls_2pc_queue_get_capacity_result(bl, size), 0);
75 ASSERT_EQ(max_size, size);
76 }
77
78 TEST_F(TestCls2PCQueue, Reserve)
79 {
80 const std::string queue_name = __PRETTY_FUNCTION__;
81 const auto max_size = 1024U*1024U;
82 const auto number_of_ops = 10U;
83 const auto number_of_elements = 23U;
84 const auto size_to_reserve = 250U;
85 librados::ObjectWriteOperation op;
86 op.create(true);
87 cls_2pc_queue_init(op, queue_name, max_size);
88 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
89
90 for (auto i = 0U; i < number_of_ops; ++i) {
91 cls_2pc_reservation::id_t res_id;
92 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
93 ASSERT_EQ(res_id, i+1);
94 }
95 cls_2pc_reservations reservations;
96 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
97 ASSERT_EQ(reservations.size(), number_of_ops);
98 for (const auto& r : reservations) {
99 ASSERT_NE(r.first, cls_2pc_reservation::NO_ID);
100 ASSERT_GT(r.second.timestamp.time_since_epoch().count(), 0);
101 }
102 }
103
104 TEST_F(TestCls2PCQueue, AsyncReserve)
105 {
106 const std::string queue_name = __PRETTY_FUNCTION__;
107 const auto max_size = 1024U*1024U;
108 constexpr auto number_of_ops = 10U;
109 constexpr auto number_of_elements = 23U;
110 const auto size_to_reserve = 250U;
111 librados::ObjectWriteOperation wop;
112 wop.create(true);
113 cls_2pc_queue_init(wop, queue_name, max_size);
114 ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
115
116 for (auto i = 0U; i < number_of_ops; ++i) {
117 bufferlist res_bl;
118 int res_rc;
119 cls_2pc_queue_reserve(wop, size_to_reserve, number_of_elements, &res_bl, &res_rc);
120 ASSERT_EQ(0, ioctx.operate(queue_name, &wop, librados::OPERATION_RETURNVEC));
121 ASSERT_EQ(res_rc, 0);
122 cls_2pc_reservation::id_t res_id;
123 ASSERT_EQ(0, cls_2pc_queue_reserve_result(res_bl, res_id));
124 ASSERT_EQ(res_id, i+1);
125 }
126
127 bufferlist bl;
128 int rc;
129 librados::ObjectReadOperation rop;
130 cls_2pc_queue_list_reservations(rop, &bl, &rc);
131 ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr));
132 ASSERT_EQ(0, rc);
133 cls_2pc_reservations reservations;
134 ASSERT_EQ(0, cls_2pc_queue_list_reservations_result(bl, reservations));
135 ASSERT_EQ(reservations.size(), number_of_ops);
136 for (const auto& r : reservations) {
137 ASSERT_NE(r.first, cls_2pc_reservation::NO_ID);
138 ASSERT_GT(r.second.timestamp.time_since_epoch().count(), 0);
139 }
140 }
141
142 TEST_F(TestCls2PCQueue, Commit)
143 {
144 const std::string queue_name = __PRETTY_FUNCTION__;
145 const auto max_size = 1024*1024*128;
146 const auto number_of_ops = 200U;
147 const auto number_of_elements = 23U;
148 librados::ObjectWriteOperation op;
149 op.create(true);
150 cls_2pc_queue_init(op, queue_name, max_size);
151 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
152
153 for (auto i = 0U; i < number_of_ops; ++i) {
154 const std::string element_prefix("op-" +to_string(i) + "-element-");
155 auto total_size = 0UL;
156 std::vector<bufferlist> data(number_of_elements);
157 // create vector of buffer lists
158 std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
159 bufferlist bl;
160 bl.append(element_prefix + to_string(j++));
161 total_size += bl.length();
162 return bl;
163 });
164
165 cls_2pc_reservation::id_t res_id;
166 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
167 ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
168 cls_2pc_queue_commit(op, data, res_id);
169 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
170 }
171 cls_2pc_reservations reservations;
172 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
173 ASSERT_EQ(reservations.size(), 0);
174 }
175
176 TEST_F(TestCls2PCQueue, Abort)
177 {
178 const std::string queue_name = __PRETTY_FUNCTION__;
179 const auto max_size = 1024U*1024U;
180 const auto number_of_ops = 17U;
181 const auto number_of_elements = 23U;
182 const auto size_to_reserve = 250U;
183 librados::ObjectWriteOperation op;
184 op.create(true);
185 cls_2pc_queue_init(op, queue_name, max_size);
186 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
187
188 for (auto i = 0U; i < number_of_ops; ++i) {
189 cls_2pc_reservation::id_t res_id;
190 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
191 ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
192 cls_2pc_queue_abort(op, res_id);
193 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
194 }
195 cls_2pc_reservations reservations;
196 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
197 ASSERT_EQ(reservations.size(), 0);
198 }
199
200 TEST_F(TestCls2PCQueue, ReserveError)
201 {
202 const std::string queue_name = __PRETTY_FUNCTION__;
203 const auto max_size = 256U*1024U;
204 const auto number_of_ops = 254U;
205 const auto number_of_elements = 1U;
206 const auto size_to_reserve = 1024U;
207 librados::ObjectWriteOperation op;
208 op.create(true);
209 cls_2pc_queue_init(op, queue_name, max_size);
210 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
211
212 cls_2pc_reservation::id_t res_id;
213 for (auto i = 0U; i < number_of_ops-1; ++i) {
214 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
215 ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
216 }
217 res_id = cls_2pc_reservation::NO_ID;
218 // this one is failing because it exceeds the queue size
219 ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
220 ASSERT_EQ(res_id, cls_2pc_reservation::NO_ID);
221
222 // this one is failing because it tries to reserve 0 entries
223 ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, 0, res_id), 0);
224 // this one is failing because it tries to reserve 0 bytes
225 ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, 0, number_of_elements, res_id), 0);
226
227 cls_2pc_reservations reservations;
228 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
229 ASSERT_EQ(reservations.size(), number_of_ops-1);
230 for (const auto& r : reservations) {
231 ASSERT_NE(r.first, cls_2pc_reservation::NO_ID);
232 ASSERT_GT(r.second.timestamp.time_since_epoch().count(), 0);
233 }
234 }
235
236 TEST_F(TestCls2PCQueue, CommitError)
237 {
238 const std::string queue_name = __PRETTY_FUNCTION__;
239 const auto max_size = 1024*1024;
240 const auto number_of_ops = 17U;
241 const auto number_of_elements = 23U;
242 librados::ObjectWriteOperation op;
243 op.create(true);
244 cls_2pc_queue_init(op, queue_name, max_size);
245 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
246
247 const auto invalid_reservation_op = 8;
248 const auto invalid_elements_op = 11;
249 std::vector<bufferlist> invalid_data(number_of_elements+3);
250 // create vector of buffer lists
251 std::generate(invalid_data.begin(), invalid_data.end(), [j = 0] () mutable {
252 bufferlist bl;
253 bl.append("invalid data is larger that regular data" + to_string(j++));
254 return bl;
255 });
256 for (auto i = 0U; i < number_of_ops; ++i) {
257 const std::string element_prefix("op-" +to_string(i) + "-element-");
258 std::vector<bufferlist> data(number_of_elements);
259 auto total_size = 0UL;
260 // create vector of buffer lists
261 std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
262 bufferlist bl;
263 bl.append(element_prefix + to_string(j++));
264 total_size += bl.length();
265 return bl;
266 });
267
268 cls_2pc_reservation::id_t res_id;
269 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
270 ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
271 if (i == invalid_reservation_op) {
272 // fail on a commits with invalid reservation id
273 cls_2pc_queue_commit(op, data, res_id+999);
274 ASSERT_NE(0, ioctx.operate(queue_name, &op));
275 } else if (i == invalid_elements_op) {
276 // fail on a commits when data size is larger than the reserved one
277 cls_2pc_queue_commit(op, invalid_data, res_id);
278 ASSERT_NE(0, ioctx.operate(queue_name, &op));
279 } else {
280 cls_2pc_queue_commit(op, data, res_id);
281 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
282 }
283 }
284 cls_2pc_reservations reservations;
285 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
286 // 2 reservations were not comitted
287 ASSERT_EQ(reservations.size(), 2);
288 }
289
290 TEST_F(TestCls2PCQueue, AbortError)
291 {
292 const std::string queue_name = __PRETTY_FUNCTION__;
293 const auto max_size = 1024*1024;
294 const auto number_of_ops = 17U;
295 const auto number_of_elements = 23U;
296 const auto size_to_reserve = 250U;
297 librados::ObjectWriteOperation op;
298 op.create(true);
299 cls_2pc_queue_init(op, queue_name, max_size);
300 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
301
302 const auto invalid_reservation_op = 8;
303
304 for (auto i = 0U; i < number_of_ops; ++i) {
305 cls_2pc_reservation::id_t res_id;
306 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
307 ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
308 if (i == invalid_reservation_op) {
309 // aborting a reservation which does not exists
310 // is a no-op, not an error
311 cls_2pc_queue_abort(op, res_id+999);
312 } else {
313 cls_2pc_queue_abort(op, res_id);
314 }
315 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
316 }
317 cls_2pc_reservations reservations;
318 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
319 // 1 reservation was not aborted
320 ASSERT_EQ(reservations.size(), 1);
321 }
322
323 TEST_F(TestCls2PCQueue, MultiReserve)
324 {
325 const std::string queue_name = __PRETTY_FUNCTION__;
326 const auto max_size = 1024*1024;
327 const auto number_of_ops = 11U;
328 const auto number_of_elements = 23U;
329 const auto max_producer_count = 10U;
330 const auto size_to_reserve = 250U;
331 librados::ObjectWriteOperation op;
332 op.create(true);
333 cls_2pc_queue_init(op, queue_name, max_size);
334 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
335
336 std::vector<std::thread> producers(max_producer_count);
337 for (auto& p : producers) {
338 p = std::thread([this, &queue_name] {
339 librados::ObjectWriteOperation op;
340 for (auto i = 0U; i < number_of_ops; ++i) {
341 cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
342 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
343 ASSERT_NE(res_id, 0);
344 }
345 });
346 }
347
348 std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); });
349
350 cls_2pc_reservations reservations;
351 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
352 ASSERT_EQ(reservations.size(), number_of_ops*max_producer_count);
353 auto total_reservations = 0U;
354 for (const auto& r : reservations) {
355 total_reservations += r.second.size;
356 }
357 ASSERT_EQ(total_reservations, number_of_ops*max_producer_count*size_to_reserve);
358 }
359
360 TEST_F(TestCls2PCQueue, MultiCommit)
361 {
362 const std::string queue_name = __PRETTY_FUNCTION__;
363 const auto max_size = 1024*1024;
364 const auto number_of_ops = 11U;
365 const auto number_of_elements = 23U;
366 const auto max_producer_count = 10U;
367 librados::ObjectWriteOperation op;
368 op.create(true);
369 cls_2pc_queue_init(op, queue_name, max_size);
370 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
371
372 std::vector<std::thread> producers(max_producer_count);
373 for (auto& p : producers) {
374 p = std::thread([this, &queue_name] {
375 librados::ObjectWriteOperation op;
376 for (auto i = 0U; i < number_of_ops; ++i) {
377 const std::string element_prefix("op-" +to_string(i) + "-element-");
378 std::vector<bufferlist> data(number_of_elements);
379 auto total_size = 0UL;
380 // create vector of buffer lists
381 std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
382 bufferlist bl;
383 bl.append(element_prefix + to_string(j++));
384 total_size += bl.length();
385 return bl;
386 });
387 cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
388 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
389 ASSERT_NE(res_id, 0);
390 cls_2pc_queue_commit(op, data, res_id);
391 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
392 }
393 });
394 }
395
396 std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); });
397
398 cls_2pc_reservations reservations;
399 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
400 ASSERT_EQ(reservations.size(), 0);
401 }
402
403 TEST_F(TestCls2PCQueue, MultiAbort)
404 {
405 const std::string queue_name = __PRETTY_FUNCTION__;
406 const auto max_size = 1024*1024;
407 const auto number_of_ops = 11U;
408 const auto number_of_elements = 23U;
409 const auto max_producer_count = 10U;
410 const auto size_to_reserve = 250U;
411 librados::ObjectWriteOperation op;
412 op.create(true);
413 cls_2pc_queue_init(op, queue_name, max_size);
414 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
415
416 std::vector<std::thread> producers(max_producer_count);
417 for (auto& p : producers) {
418 p = std::thread([this, &queue_name] {
419 librados::ObjectWriteOperation op;
420 for (auto i = 0U; i < number_of_ops; ++i) {
421 cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
422 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
423 ASSERT_NE(res_id, 0);
424 cls_2pc_queue_abort(op, res_id);
425 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
426 }
427 });
428 }
429
430 std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); });
431
432 cls_2pc_reservations reservations;
433 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
434 ASSERT_EQ(reservations.size(), 0);
435 }
436
437 TEST_F(TestCls2PCQueue, ReserveCommit)
438 {
439 const std::string queue_name = __PRETTY_FUNCTION__;
440 const auto max_size = 1024*1024;
441 const auto number_of_ops = 11U;
442 const auto number_of_elements = 23U;
443 const auto max_workers = 10U;
444 const auto size_to_reserve = 512U;
445 librados::ObjectWriteOperation op;
446 op.create(true);
447 cls_2pc_queue_init(op, queue_name, max_size);
448 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
449
450 std::vector<std::thread> reservers(max_workers);
451 for (auto& r : reservers) {
452 r = std::thread([this, &queue_name] {
453 librados::ObjectWriteOperation op;
454 for (auto i = 0U; i < number_of_ops; ++i) {
455 cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
456 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
457 ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
458 }
459 });
460 }
461
462 auto committer = std::thread([this, &queue_name] {
463 librados::ObjectWriteOperation op;
464 int remaining_ops = number_of_ops*max_workers;
465 while (remaining_ops > 0) {
466 const std::string element_prefix("op-" +to_string(remaining_ops) + "-element-");
467 std::vector<bufferlist> data(number_of_elements);
468 // create vector of buffer lists
469 std::generate(data.begin(), data.end(), [j = 0, &element_prefix] () mutable {
470 bufferlist bl;
471 bl.append(element_prefix + to_string(j++));
472 return bl;
473 });
474 cls_2pc_reservations reservations;
475 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
476 for (const auto& r : reservations) {
477 cls_2pc_queue_commit(op, data, r.first);
478 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
479 --remaining_ops;
480 }
481 }
482 });
483
484 std::for_each(reservers.begin(), reservers.end(), [](auto& r) { r.join(); });
485 committer.join();
486
487 cls_2pc_reservations reservations;
488 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
489 ASSERT_EQ(reservations.size(), 0);
490 }
491
492 TEST_F(TestCls2PCQueue, ReserveAbort)
493 {
494 const std::string queue_name = __PRETTY_FUNCTION__;
495 const auto max_size = 1024*1024;
496 const auto number_of_ops = 17U;
497 const auto number_of_elements = 23U;
498 const auto max_workers = 10U;
499 const auto size_to_reserve = 250U;
500 librados::ObjectWriteOperation op;
501 op.create(true);
502 cls_2pc_queue_init(op, queue_name, max_size);
503 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
504
505 std::vector<std::thread> reservers(max_workers);
506 for (auto& r : reservers) {
507 r = std::thread([this, &queue_name] {
508 librados::ObjectWriteOperation op;
509 for (auto i = 0U; i < number_of_ops; ++i) {
510 cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
511 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
512 ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
513 }
514 });
515 }
516
517 auto aborter = std::thread([this, &queue_name] {
518 librados::ObjectWriteOperation op;
519 int remaining_ops = number_of_ops*max_workers;
520 while (remaining_ops > 0) {
521 cls_2pc_reservations reservations;
522 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
523 for (const auto& r : reservations) {
524 cls_2pc_queue_abort(op, r.first);
525 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
526 --remaining_ops;
527 }
528 }
529 });
530
531 std::for_each(reservers.begin(), reservers.end(), [](auto& r) { r.join(); });
532 aborter.join();
533
534 cls_2pc_reservations reservations;
535 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
536 ASSERT_EQ(reservations.size(), 0);
537 }
538
539 TEST_F(TestCls2PCQueue, ManualCleanup)
540 {
541 const std::string queue_name = __PRETTY_FUNCTION__;
542 const auto max_size = 128*1024*1024;
543 const auto number_of_ops = 17U;
544 const auto number_of_elements = 23U;
545 const auto max_workers = 10U;
546 const auto size_to_reserve = 512U;
547 librados::ObjectWriteOperation op;
548 op.create(true);
549 cls_2pc_queue_init(op, queue_name, max_size);
550 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
551
552 // anything older than 100ms is cosidered stale
553 ceph::coarse_real_time stale_time = ceph::coarse_real_clock::now() + std::chrono::milliseconds(100);
554
555 std::vector<std::thread> reservers(max_workers);
556 for (auto& r : reservers) {
557 r = std::thread([this, &queue_name] {
558 librados::ObjectWriteOperation op;
559 for (auto i = 0U; i < number_of_ops; ++i) {
560 cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
561 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
562 ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
563 // wait for 10ms between each reservation to make sure at least some are stale
564 std::this_thread::sleep_for(std::chrono::milliseconds(10));
565 }
566 });
567 }
568
569 auto cleaned_reservations = 0U;
570 auto committed_reservations = 0U;
571 auto aborter = std::thread([this, &queue_name, &stale_time, &cleaned_reservations, &committed_reservations] {
572 librados::ObjectWriteOperation op;
573 int remaining_ops = number_of_ops*max_workers;
574 while (remaining_ops > 0) {
575 cls_2pc_reservations reservations;
576 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
577 for (const auto& r : reservations) {
578 if (r.second.timestamp > stale_time) {
579 // abort stale reservations
580 cls_2pc_queue_abort(op, r.first);
581 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
582 ++cleaned_reservations;
583 } else {
584 // commit good reservations
585 const std::string element_prefix("op-" +to_string(remaining_ops) + "-element-");
586 std::vector<bufferlist> data(number_of_elements);
587 // create vector of buffer lists
588 std::generate(data.begin(), data.end(), [j = 0, &element_prefix] () mutable {
589 bufferlist bl;
590 bl.append(element_prefix + to_string(j++));
591 return bl;
592 });
593 cls_2pc_queue_commit(op, data, r.first);
594 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
595 ++committed_reservations;
596 }
597 --remaining_ops;
598 }
599 }
600 });
601
602
603 std::for_each(reservers.begin(), reservers.end(), [](auto& r) { r.join(); });
604 aborter.join();
605
606 ASSERT_GT(cleaned_reservations, 0);
607 ASSERT_EQ(committed_reservations + cleaned_reservations, number_of_ops*max_workers);
608 cls_2pc_reservations reservations;
609 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
610 ASSERT_EQ(reservations.size(), 0);
611 }
612
613 TEST_F(TestCls2PCQueue, Cleanup)
614 {
615 const std::string queue_name = __PRETTY_FUNCTION__;
616 const auto max_size = 128*1024*1024;
617 const auto number_of_ops = 15U;
618 const auto number_of_elements = 23U;
619 const auto max_workers = 10U;
620 const auto size_to_reserve = 512U;
621 librados::ObjectWriteOperation op;
622 op.create(true);
623 cls_2pc_queue_init(op, queue_name, max_size);
624 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
625
626 // anything older than 100ms is cosidered stale
627 ceph::coarse_real_time stale_time = ceph::coarse_real_clock::now() + std::chrono::milliseconds(100);
628
629 std::vector<std::thread> reservers(max_workers);
630 for (auto& r : reservers) {
631 r = std::thread([this, &queue_name] {
632 librados::ObjectWriteOperation op;
633 for (auto i = 0U; i < number_of_ops; ++i) {
634 cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
635 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
636 ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
637 // wait for 10ms between each reservation to make sure at least some are stale
638 std::this_thread::sleep_for(std::chrono::milliseconds(10));
639 }
640 });
641 }
642
643 std::for_each(reservers.begin(), reservers.end(), [](auto& r) { r.join(); });
644
645 cls_2pc_reservations all_reservations;
646 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, all_reservations));
647 ASSERT_EQ(all_reservations.size(), number_of_ops*max_workers);
648
649 cls_2pc_queue_expire_reservations(op, stale_time);
650 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
651
652 cls_2pc_reservations good_reservations;
653 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, good_reservations));
654
655 for (const auto& r : all_reservations) {
656 if (good_reservations.find(r.first) == good_reservations.end()) {
657 // not in the "good" list
658 ASSERT_GE(stale_time.time_since_epoch().count(),
659 r.second.timestamp.time_since_epoch().count());
660 }
661 }
662 for (const auto& r : good_reservations) {
663 ASSERT_LT(stale_time.time_since_epoch().count(),
664 r.second.timestamp.time_since_epoch().count());
665 }
666 }
667
668 TEST_F(TestCls2PCQueue, MultiProducer)
669 {
670 const std::string queue_name = __PRETTY_FUNCTION__;
671 const auto max_size = 128*1024*1024;
672 const auto number_of_ops = 300U;
673 const auto number_of_elements = 23U;
674 const auto max_producer_count = 10U;
675 librados::ObjectWriteOperation op;
676 op.create(true);
677 cls_2pc_queue_init(op, queue_name, max_size);
678 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
679
680 auto producer_count = max_producer_count;
681
682 std::vector<std::thread> producers(max_producer_count);
683 for (auto& p : producers) {
684 p = std::thread([this, &queue_name, &producer_count] {
685 librados::ObjectWriteOperation op;
686 for (auto i = 0U; i < number_of_ops; ++i) {
687 const std::string element_prefix("op-" +to_string(i) + "-element-");
688 std::vector<bufferlist> data(number_of_elements);
689 auto total_size = 0UL;
690 // create vector of buffer lists
691 std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
692 bufferlist bl;
693 bl.append(element_prefix + to_string(j++));
694 total_size += bl.length();
695 return bl;
696 });
697 cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
698 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
699 ASSERT_NE(res_id, 0);
700 cls_2pc_queue_commit(op, data, res_id);
701 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
702 }
703 --producer_count;
704 });
705 }
706
707 auto consume_count = 0U;
708 std::thread consumer([this, &queue_name, &consume_count, &producer_count] {
709 librados::ObjectWriteOperation op;
710 const auto max_elements = 42;
711 const std::string marker;
712 bool truncated = false;
713 std::string end_marker;
714 std::vector<cls_queue_entry> entries;
715 while (producer_count > 0 || truncated) {
716 const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
717 ASSERT_EQ(0, ret);
718 consume_count += entries.size();
719 cls_2pc_queue_remove_entries(op, end_marker);
720 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
721 }
722 });
723
724 std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); });
725 consumer.join();
726 ASSERT_EQ(consume_count, number_of_ops*number_of_elements*max_producer_count);
727 }
728
729 TEST_F(TestCls2PCQueue, AsyncConsumer)
730 {
731 const std::string queue_name = __PRETTY_FUNCTION__;
732 constexpr auto max_size = 128*1024*1024;
733 constexpr auto number_of_ops = 250U;
734 constexpr auto number_of_elements = 23U;
735 librados::ObjectWriteOperation wop;
736 wop.create(true);
737 cls_2pc_queue_init(wop, queue_name, max_size);
738 ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
739
740
741 for (auto i = 0U; i < number_of_ops; ++i) {
742 const std::string element_prefix("op-" +to_string(i) + "-element-");
743 std::vector<bufferlist> data(number_of_elements);
744 auto total_size = 0UL;
745 // create vector of buffer lists
746 std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
747 bufferlist bl;
748 bl.append(element_prefix + to_string(j++));
749 total_size += bl.length();
750 return bl;
751 });
752 cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
753 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
754 ASSERT_NE(res_id, 0);
755 cls_2pc_queue_commit(wop, data, res_id);
756 ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
757 }
758
759 constexpr auto max_elements = 42;
760 std::string marker;
761 std::string end_marker;
762 librados::ObjectReadOperation rop;
763 auto consume_count = 0U;
764 std::vector<cls_queue_entry> entries;
765 bool truncated = true;
766 while (truncated) {
767 bufferlist bl;
768 int rc;
769 cls_2pc_queue_list_entries(rop, marker, max_elements, &bl, &rc);
770 ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr));
771 ASSERT_EQ(rc, 0);
772 ASSERT_EQ(cls_2pc_queue_list_entries_result(bl, entries, &truncated, end_marker), 0);
773 consume_count += entries.size();
774 cls_2pc_queue_remove_entries(wop, end_marker);
775 marker = end_marker;
776 }
777
778 ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
779 // execute all delete operations in a batch
780 ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
781 // make sure that queue is empty
782 ASSERT_EQ(cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker), 0);
783 ASSERT_EQ(entries.size(), 0);
784 }
785
786 TEST_F(TestCls2PCQueue, MultiProducerConsumer)
787 {
788 const std::string queue_name = __PRETTY_FUNCTION__;
789 const auto max_size = 1024*1024;
790 const auto number_of_ops = 300U;
791 const auto number_of_elements = 23U;
792 const auto max_workers = 10U;
793 librados::ObjectWriteOperation op;
794 op.create(true);
795 cls_2pc_queue_init(op, queue_name, max_size);
796 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
797
798 auto producer_count = max_workers;
799
800 auto retry_happened = false;
801
802 std::vector<std::thread> producers(max_workers);
803 for (auto& p : producers) {
804 p = std::thread([this, &queue_name, &producer_count, &retry_happened] {
805 librados::ObjectWriteOperation op;
806 for (auto i = 0U; i < number_of_ops; ++i) {
807 const std::string element_prefix("op-" +to_string(i) + "-element-");
808 std::vector<bufferlist> data(number_of_elements);
809 auto total_size = 0UL;
810 // create vector of buffer lists
811 std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
812 bufferlist bl;
813 bl.append(element_prefix + to_string(j++));
814 total_size += bl.length();
815 return bl;
816 });
817 cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
818 auto rc = cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id);
819 while (rc != 0) {
820 // other errors should cause test to fail
821 ASSERT_EQ(rc, -ENOSPC);
822 ASSERT_EQ(res_id, 0);
823 // queue is full, sleep and retry
824 retry_happened = true;
825 std::this_thread::sleep_for(std::chrono::milliseconds(10));
826 rc = cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id);
827 };
828 ASSERT_NE(res_id, 0);
829 cls_2pc_queue_commit(op, data, res_id);
830 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
831 }
832 --producer_count;
833 });
834 }
835
836 const auto max_elements = 128;
837 std::vector<std::thread> consumers(max_workers/2);
838 for (auto& c : consumers) {
839 c = std::thread([this, &queue_name, &producer_count] {
840 librados::ObjectWriteOperation op;
841 const std::string marker;
842 bool truncated = false;
843 std::string end_marker;
844 std::vector<cls_queue_entry> entries;
845 while (producer_count > 0 || truncated) {
846 const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
847 ASSERT_EQ(0, ret);
848 if (entries.empty()) {
849 // queue is empty, let it fill
850 std::this_thread::sleep_for(std::chrono::milliseconds(100));
851 } else {
852 cls_2pc_queue_remove_entries(op, end_marker);
853 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
854 }
855 }
856 });
857 }
858
859 std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); });
860 std::for_each(consumers.begin(), consumers.end(), [](auto& c) { c.join(); });
861 if (!retry_happened) {
862 std::cerr << "Queue was never full - all reservations were sucessfull." <<
863 "Please decrease the amount of consumer threads" << std::endl;
864 }
865 // make sure that queue is empty and no reservations remain
866 cls_2pc_reservations reservations;
867 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
868 ASSERT_EQ(reservations.size(), 0);
869 const std::string marker;
870 bool truncated = false;
871 std::string end_marker;
872 std::vector<cls_queue_entry> entries;
873 ASSERT_EQ(0, cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker));
874 ASSERT_EQ(entries.size(), 0);
875 }
876
877 TEST_F(TestCls2PCQueue, ReserveSpillover)
878 {
879 const std::string queue_name = __PRETTY_FUNCTION__;
880 const auto max_size = 1024U*1024U;
881 const auto number_of_ops = 1024U;
882 const auto number_of_elements = 8U;
883 const auto size_to_reserve = 64U;
884 librados::ObjectWriteOperation op;
885 op.create(true);
886 cls_2pc_queue_init(op, queue_name, max_size);
887 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
888
889 for (auto i = 0U; i < number_of_ops; ++i) {
890 cls_2pc_reservation::id_t res_id;
891 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
892 ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
893 }
894 cls_2pc_reservations reservations;
895 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
896 ASSERT_EQ(reservations.size(), number_of_ops);
897 for (const auto& r : reservations) {
898 ASSERT_NE(r.first, cls_2pc_reservation::NO_ID);
899 ASSERT_GT(r.second.timestamp.time_since_epoch().count(), 0);
900 }
901 }
902
903 TEST_F(TestCls2PCQueue, CommitSpillover)
904 {
905 const std::string queue_name = __PRETTY_FUNCTION__;
906 const auto max_size = 1024U*1024U;
907 const auto number_of_ops = 1024U;
908 const auto number_of_elements = 4U;
909 const auto size_to_reserve = 128U;
910 librados::ObjectWriteOperation op;
911 op.create(true);
912 cls_2pc_queue_init(op, queue_name, max_size);
913 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
914
915 for (auto i = 0U; i < number_of_ops; ++i) {
916 cls_2pc_reservation::id_t res_id;
917 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
918 ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
919 }
920 cls_2pc_reservations reservations;
921 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
922 for (const auto& r : reservations) {
923 const std::string element_prefix("foo");
924 std::vector<bufferlist> data(number_of_elements);
925 auto total_size = 0UL;
926 // create vector of buffer lists
927 std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
928 bufferlist bl;
929 bl.append(element_prefix + to_string(j++));
930 total_size += bl.length();
931 return bl;
932 });
933 ASSERT_NE(r.first, cls_2pc_reservation::NO_ID);
934 cls_2pc_queue_commit(op, data, r.first);
935 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
936 }
937 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
938 ASSERT_EQ(reservations.size(), 0);
939 }
940
941 TEST_F(TestCls2PCQueue, AbortSpillover)
942 {
943 const std::string queue_name = __PRETTY_FUNCTION__;
944 const auto max_size = 1024U*1024U;
945 const auto number_of_ops = 1024U;
946 const auto number_of_elements = 4U;
947 const auto size_to_reserve = 128U;
948 librados::ObjectWriteOperation op;
949 op.create(true);
950 cls_2pc_queue_init(op, queue_name, max_size);
951 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
952
953 for (auto i = 0U; i < number_of_ops; ++i) {
954 cls_2pc_reservation::id_t res_id;
955 ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
956 ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
957 }
958 cls_2pc_reservations reservations;
959 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
960 for (const auto& r : reservations) {
961 ASSERT_NE(r.first, cls_2pc_reservation::NO_ID);
962 cls_2pc_queue_abort(op, r.first);
963 ASSERT_EQ(0, ioctx.operate(queue_name, &op));
964 }
965 ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
966 ASSERT_EQ(reservations.size(), 0);
967 }
968