]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/cls_fifo/test_cls_fifo.cc
c7a40413bb87b49e64a83d7e974dd9948b150db9
[ceph.git] / ceph / src / test / cls_fifo / test_cls_fifo.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2019 Red Hat, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <cerrno>
16 #include <iostream>
17 #include <string_view>
18
19 #include <boost/asio.hpp>
20 #include <boost/system/error_code.hpp>
21
22 #include <spawn/spawn.hpp>
23
24 #include "include/scope_guard.h"
25 #include "include/types.h"
26 #include "include/neorados/RADOS.hpp"
27
28 #include "cls/fifo/cls_fifo_ops.h"
29
30 #include "neorados/cls/fifo.h"
31
32 #include "test/neorados/common_tests.h"
33
34 #include "gtest/gtest.h"
35
36 using namespace std;
37
38 namespace R = neorados;
39 namespace ba = boost::asio;
40 namespace bs = boost::system;
41 namespace cb = ceph::buffer;
42 namespace fifo = rados::cls::fifo;
43 namespace RCf = neorados::cls::fifo;
44 namespace s = spawn;
45
46 namespace {
47 void fifo_create(R::RADOS& r,
48 const R::IOContext& ioc,
49 const R::Object& oid,
50 std::string_view id,
51 s::yield_context y,
52 std::optional<fifo::objv> objv = std::nullopt,
53 std::optional<std::string_view> oid_prefix = std::nullopt,
54 bool exclusive = false,
55 std::uint64_t max_part_size = RCf::default_max_part_size,
56 std::uint64_t max_entry_size = RCf::default_max_entry_size)
57 {
58 R::WriteOp op;
59 RCf::create_meta(op, id, objv, oid_prefix, exclusive, max_part_size,
60 max_entry_size);
61 r.execute(oid, ioc, std::move(op), y);
62 }
63 }
64
65 TEST(ClsFIFO, TestCreate) {
66 ba::io_context c;
67 auto fifo_id = "fifo"sv;
68 R::Object oid(fifo_id);
69
70 s::spawn(c, [&](s::yield_context y) {
71 auto r = R::RADOS::Builder{}.build(c, y);
72 auto pool = create_pool(r, get_temp_pool_name(), y);
73 auto sg = make_scope_guard(
74 [&] {
75 r.delete_pool(pool, y);
76 });
77 R::IOContext ioc(pool);
78 bs::error_code ec;
79 fifo_create(r, ioc, oid, ""s, y[ec]);
80 EXPECT_EQ(bs::errc::invalid_argument, ec);
81 fifo_create(r, ioc, oid, fifo_id, y[ec], std::nullopt,
82 std::nullopt, false, 0);
83 EXPECT_EQ(bs::errc::invalid_argument, ec);
84 fifo_create(r, ioc, oid, {}, y[ec],
85 std::nullopt, std::nullopt,
86 false, RCf::default_max_part_size, 0);
87 EXPECT_EQ(bs::errc::invalid_argument, ec);
88 fifo_create(r, ioc, oid, fifo_id, y);
89 {
90 std::uint64_t size;
91 std::uint64_t size2;
92 {
93 R::ReadOp op;
94 op.stat(&size, nullptr);
95 r.execute(oid, ioc, std::move(op),
96 nullptr, y);
97 EXPECT_GT(size, 0);
98 }
99
100 {
101 R::ReadOp op;
102 op.stat(&size2, nullptr);
103 r.execute(oid, ioc, std::move(op), nullptr, y);
104 }
105 EXPECT_EQ(size2, size);
106 }
107 /* test idempotency */
108 fifo_create(r, ioc, oid, fifo_id, y);
109 fifo_create(r, ioc, oid, {}, y[ec], std::nullopt,
110 std::nullopt, false);
111 EXPECT_EQ(bs::errc::invalid_argument, ec);
112 fifo_create(r, ioc, oid, {}, y[ec], std::nullopt,
113 "myprefix"sv, false);
114 EXPECT_EQ(bs::errc::invalid_argument, ec);
115 fifo_create(r, ioc, oid, "foo"sv, y[ec],
116 std::nullopt, std::nullopt, false);
117 EXPECT_EQ(bs::errc::file_exists, ec);
118 });
119 c.run();
120 }
121
122 TEST(ClsFIFO, TestGetInfo) {
123 ba::io_context c;
124 auto fifo_id = "fifo"sv;
125 R::Object oid(fifo_id);
126
127 s::spawn(c, [&](s::yield_context y) {
128 auto r = R::RADOS::Builder{}.build(c, y);
129 auto pool = create_pool(r, get_temp_pool_name(), y);
130 auto sg = make_scope_guard(
131 [&] {
132 r.delete_pool(pool, y);
133 });
134 R::IOContext ioc(pool);
135 /* first successful create */
136 fifo_create(r, ioc, oid, fifo_id, y);
137
138 fifo::info info;
139 std::uint32_t part_header_size;
140 std::uint32_t part_entry_overhead;
141 {
142 R::ReadOp op;
143 RCf::get_meta(op, std::nullopt,
144 nullptr, &info, &part_header_size,
145 &part_entry_overhead);
146 r.execute(oid, ioc, std::move(op), nullptr, y);
147 EXPECT_GT(part_header_size, 0);
148 EXPECT_GT(part_entry_overhead, 0);
149 EXPECT_FALSE(info.version.instance.empty());
150 }
151 {
152 R::ReadOp op;
153 RCf::get_meta(op, info.version,
154 nullptr, &info, &part_header_size,
155 &part_entry_overhead);
156 r.execute(oid, ioc, std::move(op), nullptr, y);
157 }
158 {
159 R::ReadOp op;
160 fifo::objv objv;
161 objv.instance = "foo";
162 objv.ver = 12;
163 RCf::get_meta(op, objv,
164 nullptr, &info, &part_header_size,
165 &part_entry_overhead);
166 ASSERT_ANY_THROW(r.execute(oid, ioc, std::move(op),
167 nullptr, y));
168 }
169 });
170 c.run();
171 }
172
173 TEST(FIFO, TestOpenDefault) {
174 ba::io_context c;
175 auto fifo_id = "fifo"s;
176
177 s::spawn(c, [&](s::yield_context y) {
178 auto r = R::RADOS::Builder{}.build(c, y);
179 auto pool = create_pool(r, get_temp_pool_name(), y);
180 auto sg = make_scope_guard(
181 [&] {
182 r.delete_pool(pool, y);
183 });
184 R::IOContext ioc(pool);
185 auto fifo = RCf::FIFO::create(r, ioc, fifo_id, y);
186 // force reading from backend
187 fifo->read_meta(y);
188 auto info = fifo->meta();
189 EXPECT_EQ(info.id, fifo_id);
190 });
191 c.run();
192 }
193
194 TEST(FIFO, TestOpenParams) {
195 ba::io_context c;
196 auto fifo_id = "fifo"sv;
197
198 s::spawn(c, [&](s::yield_context y) {
199 auto r = R::RADOS::Builder{}.build(c, y);
200 auto pool = create_pool(r, get_temp_pool_name(), y);
201 auto sg = make_scope_guard(
202 [&] {
203 r.delete_pool(pool, y);
204 });
205 R::IOContext ioc(pool);
206
207 const std::uint64_t max_part_size = 10 * 1024;
208 const std::uint64_t max_entry_size = 128;
209 auto oid_prefix = "foo.123."sv;
210 fifo::objv objv;
211 objv.instance = "fooz"s;
212 objv.ver = 10;
213
214 /* first successful create */
215 auto f = RCf::FIFO::create(r, ioc, fifo_id, y, objv, oid_prefix,
216 false, max_part_size,
217 max_entry_size);
218
219
220 /* force reading from backend */
221 f->read_meta(y);
222 auto info = f->meta();
223 ASSERT_EQ(info.id, fifo_id);
224 ASSERT_EQ(info.params.max_part_size, max_part_size);
225 ASSERT_EQ(info.params.max_entry_size, max_entry_size);
226 ASSERT_EQ(info.version, objv);
227 });
228 c.run();
229 }
230
231 namespace {
232 template<class T>
233 std::pair<T, std::string> decode_entry(const RCf::list_entry& entry)
234 {
235 T val;
236 auto iter = entry.data.cbegin();
237 decode(val, iter);
238 return std::make_pair(std::move(val), entry.marker);
239 }
240 }
241
242
243
244 TEST(FIFO, TestPushListTrim) {
245 ba::io_context c;
246 auto fifo_id = "fifo"sv;
247
248 s::spawn(c, [&](s::yield_context y) mutable {
249 auto r = R::RADOS::Builder{}.build(c, y);
250 auto pool = create_pool(r, get_temp_pool_name(), y);
251 auto sg = make_scope_guard(
252 [&] {
253 r.delete_pool(pool, y);
254 });
255 R::IOContext ioc(pool);
256 auto f = RCf::FIFO::create(r, ioc, fifo_id, y);
257 static constexpr auto max_entries = 10u;
258 for (uint32_t i = 0; i < max_entries; ++i) {
259 cb::list bl;
260 encode(i, bl);
261 f->push(bl, y);
262 }
263
264 std::optional<std::string> marker;
265 /* get entries one by one */
266
267 for (auto i = 0u; i < max_entries; ++i) {
268 auto [result, more] = f->list(1, marker, y);
269
270 bool expected_more = (i != (max_entries - 1));
271 ASSERT_EQ(expected_more, more);
272 ASSERT_EQ(1, result.size());
273
274 std::uint32_t val;
275 std::tie(val, marker) =
276 decode_entry<std::uint32_t>(result.front());
277
278 ASSERT_EQ(i, val);
279 }
280
281 /* get all entries at once */
282 std::string markers[max_entries];
283 std::uint32_t min_entry = 0;
284 {
285 auto [result, more] = f->list(max_entries * 10, std::nullopt,
286 y);
287
288 ASSERT_FALSE(more);
289 ASSERT_EQ(max_entries, result.size());
290
291
292 for (auto i = 0u; i < max_entries; ++i) {
293 std::uint32_t val;
294
295 std::tie(val, markers[i]) =
296 decode_entry<std::uint32_t>(result[i]);
297 ASSERT_EQ(i, val);
298 }
299
300
301 /* trim one entry */
302 f->trim(markers[min_entry], false, y);
303 ++min_entry;
304 }
305
306 auto [result, more] = f->list(max_entries * 10,
307 std::nullopt, y);
308
309 ASSERT_FALSE(more);
310 ASSERT_EQ(max_entries - min_entry, result.size());
311
312 for (auto i = min_entry; i < max_entries; ++i) {
313 std::uint32_t val;
314
315 std::tie(val, markers[i - min_entry]) =
316 decode_entry<std::uint32_t>(result[i - min_entry]);
317 ASSERT_EQ(i, val);
318 }
319
320 });
321 c.run();
322 }
323
324
325 TEST(FIFO, TestPushTooBig) {
326 ba::io_context c;
327 auto fifo_id = "fifo"sv;
328 static constexpr auto max_part_size = 2048ull;
329 static constexpr auto max_entry_size = 128ull;
330
331 s::spawn(c, [&](s::yield_context y) {
332 auto r = R::RADOS::Builder{}.build(c, y);
333 auto pool = create_pool(r, get_temp_pool_name(), y);
334 auto sg = make_scope_guard(
335 [&] {
336 r.delete_pool(pool, y);
337 });
338 R::IOContext ioc(pool);
339
340 auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt,
341 std::nullopt, false, max_part_size,
342 max_entry_size);
343
344 char buf[max_entry_size + 1];
345 memset(buf, 0, sizeof(buf));
346
347 cb::list bl;
348 bl.append(buf, sizeof(buf));
349
350 bs::error_code ec;
351 f->push(bl, y[ec]);
352 EXPECT_EQ(RCf::errc::entry_too_large, ec);
353 });
354 c.run();
355 }
356
357
358 TEST(FIFO, TestMultipleParts) {
359 ba::io_context c;
360 auto fifo_id = "fifo"sv;
361 static constexpr auto max_part_size = 2048ull;
362 static constexpr auto max_entry_size = 128ull;
363
364 s::spawn(c, [&](s::yield_context y) mutable {
365 auto r = R::RADOS::Builder{}.build(c, y);
366 auto pool = create_pool(r, get_temp_pool_name(), y);
367 auto sg = make_scope_guard(
368 [&] {
369 r.delete_pool(pool, y);
370 });
371 R::IOContext ioc(pool);
372
373 auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt,
374 std::nullopt, false, max_part_size,
375 max_entry_size);
376
377
378 char buf[max_entry_size];
379 memset(buf, 0, sizeof(buf));
380
381 const auto [part_header_size, part_entry_overhead] =
382 f->get_part_layout_info();
383
384 const auto entries_per_part =
385 (max_part_size - part_header_size) /
386 (max_entry_size + part_entry_overhead);
387
388 const auto max_entries = entries_per_part * 4 + 1;
389
390 /* push enough entries */
391 for (auto i = 0u; i < max_entries; ++i) {
392 cb::list bl;
393
394 *(int *)buf = i;
395 bl.append(buf, sizeof(buf));
396
397 f->push(bl, y);
398 }
399
400 auto info = f->meta();
401
402 ASSERT_EQ(info.id, fifo_id);
403 /* head should have advanced */
404 ASSERT_GT(info.head_part_num, 0);
405
406
407 /* list all at once */
408 auto [result, more] = f->list(max_entries, std::nullopt, y);
409 EXPECT_EQ(false, more);
410
411 ASSERT_EQ(max_entries, result.size());
412
413 for (auto i = 0u; i < max_entries; ++i) {
414 auto& bl = result[i].data;
415 ASSERT_EQ(i, *(int *)bl.c_str());
416 }
417
418 std::optional<std::string> marker;
419 /* get entries one by one */
420
421 for (auto i = 0u; i < max_entries; ++i) {
422 auto [result, more] = f->list(1, marker, y);
423 ASSERT_EQ(result.size(), 1);
424 const bool expected_more = (i != (max_entries - 1));
425 ASSERT_EQ(expected_more, more);
426
427 std::uint32_t val;
428 std::tie(val, marker) =
429 decode_entry<std::uint32_t>(result.front());
430
431 auto& entry = result.front();
432 auto& bl = entry.data;
433 ASSERT_EQ(i, *(int *)bl.c_str());
434 marker = entry.marker;
435 }
436
437 /* trim one at a time */
438 marker.reset();
439 for (auto i = 0u; i < max_entries; ++i) {
440 /* read single entry */
441 {
442 auto [result, more] = f->list(1, marker, y);
443 ASSERT_EQ(result.size(), 1);
444 const bool expected_more = (i != (max_entries - 1));
445 ASSERT_EQ(expected_more, more);
446
447 marker = result.front().marker;
448
449 f->trim(*marker, false, y);
450 }
451
452 /* check tail */
453 info = f->meta();
454 ASSERT_EQ(info.tail_part_num, i / entries_per_part);
455
456 /* try to read all again, see how many entries left */
457 auto [result, more] = f->list(max_entries, marker, y);
458 ASSERT_EQ(max_entries - i - 1, result.size());
459 ASSERT_EQ(false, more);
460 }
461
462 /* tail now should point at head */
463 info = f->meta();
464 ASSERT_EQ(info.head_part_num, info.tail_part_num);
465
466 /* check old tails are removed */
467 for (auto i = 0; i < info.tail_part_num; ++i) {
468 bs::error_code ec;
469 f->get_part_info(i, y[ec]);
470 ASSERT_EQ(bs::errc::no_such_file_or_directory, ec);
471 }
472 /* check current tail exists */
473 f->get_part_info(info.tail_part_num, y);
474 });
475 c.run();
476 }
477
478
479 TEST(FIFO, TestTwoPushers) {
480 ba::io_context c;
481 auto fifo_id = "fifo"sv;
482 static constexpr auto max_part_size = 2048ull;
483 static constexpr auto max_entry_size = 128ull;
484
485 s::spawn(c, [&](s::yield_context y) {
486 auto r = R::RADOS::Builder{}.build(c, y);
487 auto pool = create_pool(r, get_temp_pool_name(), y);
488 auto sg = make_scope_guard(
489 [&] {
490 r.delete_pool(pool, y);
491 });
492 R::IOContext ioc(pool);
493
494 auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt,
495 std::nullopt, false, max_part_size,
496 max_entry_size);
497
498
499
500 char buf[max_entry_size];
501 memset(buf, 0, sizeof(buf));
502
503
504 auto [part_header_size, part_entry_overhead] =
505 f->get_part_layout_info();
506
507 const auto entries_per_part =
508 (max_part_size - part_header_size) /
509 (max_entry_size + part_entry_overhead);
510
511 const auto max_entries = entries_per_part * 4 + 1;
512
513 auto f2 = RCf::FIFO::open(r, ioc, fifo_id, y);
514
515 std::vector fifos{&f, &f2};
516
517 for (auto i = 0u; i < max_entries; ++i) {
518 cb::list bl;
519 *(int *)buf = i;
520 bl.append(buf, sizeof(buf));
521
522 auto& f = fifos[i % fifos.size()];
523
524 (*f)->push(bl, y);
525 }
526
527 /* list all by both */
528 {
529 auto [result, more] = f2->list(max_entries, std::nullopt, y);
530
531 ASSERT_EQ(false, more);
532 ASSERT_EQ(max_entries, result.size());
533 }
534 auto [result, more] = f2->list(max_entries, std::nullopt, y);
535 ASSERT_EQ(false, more);
536 ASSERT_EQ(max_entries, result.size());
537
538 for (auto i = 0u; i < max_entries; ++i) {
539 auto& bl = result[i].data;
540 ASSERT_EQ(i, *(int *)bl.c_str());
541 }
542 });
543 c.run();
544 }
545
546
547 TEST(FIFO, TestTwoPushersTrim) {
548 ba::io_context c;
549 auto fifo_id = "fifo"sv;
550 static constexpr auto max_part_size = 2048ull;
551 static constexpr auto max_entry_size = 128ull;
552
553 s::spawn(c, [&](s::yield_context y) {
554 auto r = R::RADOS::Builder{}.build(c, y);
555 auto pool = create_pool(r, get_temp_pool_name(), y);
556 auto sg = make_scope_guard(
557 [&] {
558 r.delete_pool(pool, y);
559 });
560 R::IOContext ioc(pool);
561
562 auto f1 = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt,
563 std::nullopt, false, max_part_size,
564 max_entry_size);
565
566 char buf[max_entry_size];
567 memset(buf, 0, sizeof(buf));
568
569
570 auto [part_header_size, part_entry_overhead] =
571 f1->get_part_layout_info();
572
573 const auto entries_per_part =
574 (max_part_size - part_header_size) /
575 (max_entry_size + part_entry_overhead);
576
577 const auto max_entries = entries_per_part * 4 + 1;
578
579 auto f2 = RCf::FIFO::open(r, ioc, fifo_id, y);
580
581 /* push one entry to f2 and the rest to f1 */
582
583 for (auto i = 0u; i < max_entries; ++i) {
584 cb::list bl;
585
586 *(int *)buf = i;
587 bl.append(buf, sizeof(buf));
588
589 auto f = (i < 1 ? &f2 : &f1);
590 (*f)->push(bl, y);
591 }
592
593 /* trim half by fifo1 */
594 auto num = max_entries / 2;
595
596 std::string marker;
597 {
598 auto [result, more] = f1->list(num, std::nullopt, y);
599
600 ASSERT_EQ(true, more);
601 ASSERT_EQ(num, result.size());
602
603 for (auto i = 0u; i < num; ++i) {
604 auto& bl = result[i].data;
605 ASSERT_EQ(i, *(int *)bl.c_str());
606 }
607
608 auto& entry = result[num - 1];
609 marker = entry.marker;
610
611 f1->trim(marker, false, y);
612
613 /* list what's left by fifo2 */
614
615 }
616
617 const auto left = max_entries - num;
618 auto [result, more] = f2->list(left, marker, y);
619 ASSERT_EQ(left, result.size());
620 ASSERT_EQ(false, more);
621
622 for (auto i = num; i < max_entries; ++i) {
623 auto& bl = result[i - num].data;
624 ASSERT_EQ(i, *(int *)bl.c_str());
625 }
626 });
627 c.run();
628 }
629
630 TEST(FIFO, TestPushBatch) {
631 ba::io_context c;
632 auto fifo_id = "fifo"sv;
633 static constexpr auto max_part_size = 2048ull;
634 static constexpr auto max_entry_size = 128ull;
635
636 s::spawn(c, [&](s::yield_context y) {
637 auto r = R::RADOS::Builder{}.build(c, y);
638 auto pool = create_pool(r, get_temp_pool_name(), y);
639 auto sg = make_scope_guard(
640 [&] {
641 r.delete_pool(pool, y);
642 });
643 R::IOContext ioc(pool);
644
645 auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt,
646 std::nullopt, false, max_part_size,
647 max_entry_size);
648
649
650 char buf[max_entry_size];
651 memset(buf, 0, sizeof(buf));
652
653 auto [part_header_size, part_entry_overhead]
654 = f->get_part_layout_info();
655
656 auto entries_per_part =
657 (max_part_size - part_header_size) /
658 (max_entry_size + part_entry_overhead);
659
660 auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */
661
662 std::vector<cb::list> bufs;
663
664 for (auto i = 0u; i < max_entries; ++i) {
665 cb::list bl;
666
667 *(int *)buf = i;
668 bl.append(buf, sizeof(buf));
669
670 bufs.push_back(bl);
671 }
672
673 f->push(bufs, y);
674
675 /* list all */
676
677 auto [result, more] = f->list(max_entries, std::nullopt, y);
678 ASSERT_EQ(false, more);
679 ASSERT_EQ(max_entries, result.size());
680
681 for (auto i = 0u; i < max_entries; ++i) {
682 auto& bl = result[i].data;
683 ASSERT_EQ(i, *(int *)bl.c_str());
684 }
685
686 auto& info = f->meta();
687 ASSERT_EQ(info.head_part_num, 4);
688 });
689 c.run();
690 }
691
692 TEST(FIFO, TestTrimExclusive) {
693 ba::io_context c;
694 auto fifo_id = "fifo"sv;
695
696 s::spawn(c, [&](s::yield_context y) mutable {
697 auto r = R::RADOS::Builder{}.build(c, y);
698 auto pool = create_pool(r, get_temp_pool_name(), y);
699 auto sg = make_scope_guard(
700 [&] {
701 r.delete_pool(pool, y);
702 });
703 R::IOContext ioc(pool);
704 auto f = RCf::FIFO::create(r, ioc, fifo_id, y);
705 static constexpr auto max_entries = 10u;
706 for (uint32_t i = 0; i < max_entries; ++i) {
707 cb::list bl;
708 encode(i, bl);
709 f->push(bl, y);
710 }
711
712 {
713 auto [result, more] = f->list(1, std::nullopt, y);
714 auto [val, marker] =
715 decode_entry<std::uint32_t>(result.front());
716 ASSERT_EQ(0, val);
717 f->trim(marker, true, y);
718 }
719 {
720 auto [result, more] = f->list(max_entries, std::nullopt, y);
721 auto [val, marker] = decode_entry<std::uint32_t>(result.front());
722 ASSERT_EQ(0, val);
723 f->trim(result[4].marker, true, y);
724 }
725 {
726 auto [result, more] = f->list(max_entries, std::nullopt, y);
727 auto [val, marker] =
728 decode_entry<std::uint32_t>(result.front());
729 ASSERT_EQ(4, val);
730 f->trim(result.back().marker, true, y);
731 }
732 {
733 auto [result, more] = f->list(max_entries, std::nullopt, y);
734 auto [val, marker] =
735 decode_entry<std::uint32_t>(result.front());
736 ASSERT_EQ(result.size(), 1);
737 ASSERT_EQ(max_entries - 1, val);
738 }
739 });
740 c.run();
741 }