1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2019 Red Hat, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include <string_view>
19 #include <boost/asio.hpp>
20 #include <boost/system/error_code.hpp>
22 #include <spawn/spawn.hpp>
24 #include "include/scope_guard.h"
25 #include "include/types.h"
26 #include "include/neorados/RADOS.hpp"
28 #include "cls/fifo/cls_fifo_ops.h"
30 #include "neorados/cls/fifo.h"
32 #include "test/neorados/common_tests.h"
34 #include "gtest/gtest.h"
38 namespace R
= neorados
;
39 namespace ba
= boost::asio
;
40 namespace bs
= boost::system
;
41 namespace cb
= ceph::buffer
;
42 namespace fifo
= rados::cls::fifo
;
43 namespace RCf
= neorados::cls::fifo
;
47 void fifo_create(R::RADOS
& r
,
48 const R::IOContext
& ioc
,
52 std::optional
<fifo::objv
> objv
= std::nullopt
,
53 std::optional
<std::string_view
> oid_prefix
= std::nullopt
,
54 bool exclusive
= false,
55 std::uint64_t max_part_size
= RCf::default_max_part_size
,
56 std::uint64_t max_entry_size
= RCf::default_max_entry_size
)
59 RCf::create_meta(op
, id
, objv
, oid_prefix
, exclusive
, max_part_size
,
61 r
.execute(oid
, ioc
, std::move(op
), y
);
65 TEST(ClsFIFO
, TestCreate
) {
67 auto fifo_id
= "fifo"sv
;
68 R::Object
oid(fifo_id
);
70 s::spawn(c
, [&](s::yield_context y
) {
71 auto r
= R::RADOS::Builder
{}.build(c
, y
);
72 auto pool
= create_pool(r
, get_temp_pool_name(), y
);
73 auto sg
= make_scope_guard(
75 r
.delete_pool(pool
, y
);
77 R::IOContext
ioc(pool
);
79 fifo_create(r
, ioc
, oid
, ""s
, y
[ec
]);
80 EXPECT_EQ(bs::errc::invalid_argument
, ec
);
81 fifo_create(r
, ioc
, oid
, fifo_id
, y
[ec
], std::nullopt
,
82 std::nullopt
, false, 0);
83 EXPECT_EQ(bs::errc::invalid_argument
, ec
);
84 fifo_create(r
, ioc
, oid
, {}, y
[ec
],
85 std::nullopt
, std::nullopt
,
86 false, RCf::default_max_part_size
, 0);
87 EXPECT_EQ(bs::errc::invalid_argument
, ec
);
88 fifo_create(r
, ioc
, oid
, fifo_id
, y
);
94 op
.stat(&size
, nullptr);
95 r
.execute(oid
, ioc
, std::move(op
),
102 op
.stat(&size2
, nullptr);
103 r
.execute(oid
, ioc
, std::move(op
), nullptr, y
);
105 EXPECT_EQ(size2
, size
);
107 /* test idempotency */
108 fifo_create(r
, ioc
, oid
, fifo_id
, y
);
109 fifo_create(r
, ioc
, oid
, {}, y
[ec
], std::nullopt
,
110 std::nullopt
, false);
111 EXPECT_EQ(bs::errc::invalid_argument
, ec
);
112 fifo_create(r
, ioc
, oid
, {}, y
[ec
], std::nullopt
,
113 "myprefix"sv
, false);
114 EXPECT_EQ(bs::errc::invalid_argument
, ec
);
115 fifo_create(r
, ioc
, oid
, "foo"sv
, y
[ec
],
116 std::nullopt
, std::nullopt
, false);
117 EXPECT_EQ(bs::errc::file_exists
, ec
);
122 TEST(ClsFIFO
, TestGetInfo
) {
124 auto fifo_id
= "fifo"sv
;
125 R::Object
oid(fifo_id
);
127 s::spawn(c
, [&](s::yield_context y
) {
128 auto r
= R::RADOS::Builder
{}.build(c
, y
);
129 auto pool
= create_pool(r
, get_temp_pool_name(), y
);
130 auto sg
= make_scope_guard(
132 r
.delete_pool(pool
, y
);
134 R::IOContext
ioc(pool
);
135 /* first successful create */
136 fifo_create(r
, ioc
, oid
, fifo_id
, y
);
139 std::uint32_t part_header_size
;
140 std::uint32_t part_entry_overhead
;
143 RCf::get_meta(op
, std::nullopt
,
144 nullptr, &info
, &part_header_size
,
145 &part_entry_overhead
);
146 r
.execute(oid
, ioc
, std::move(op
), nullptr, y
);
147 EXPECT_GT(part_header_size
, 0);
148 EXPECT_GT(part_entry_overhead
, 0);
149 EXPECT_FALSE(info
.version
.instance
.empty());
153 RCf::get_meta(op
, info
.version
,
154 nullptr, &info
, &part_header_size
,
155 &part_entry_overhead
);
156 r
.execute(oid
, ioc
, std::move(op
), nullptr, y
);
161 objv
.instance
= "foo";
163 RCf::get_meta(op
, objv
,
164 nullptr, &info
, &part_header_size
,
165 &part_entry_overhead
);
166 ASSERT_ANY_THROW(r
.execute(oid
, ioc
, std::move(op
),
173 TEST(FIFO
, TestOpenDefault
) {
175 auto fifo_id
= "fifo"s
;
177 s::spawn(c
, [&](s::yield_context y
) {
178 auto r
= R::RADOS::Builder
{}.build(c
, y
);
179 auto pool
= create_pool(r
, get_temp_pool_name(), y
);
180 auto sg
= make_scope_guard(
182 r
.delete_pool(pool
, y
);
184 R::IOContext
ioc(pool
);
185 auto fifo
= RCf::FIFO::create(r
, ioc
, fifo_id
, y
);
186 // force reading from backend
188 auto info
= fifo
->meta();
189 EXPECT_EQ(info
.id
, fifo_id
);
194 TEST(FIFO
, TestOpenParams
) {
196 auto fifo_id
= "fifo"sv
;
198 s::spawn(c
, [&](s::yield_context y
) {
199 auto r
= R::RADOS::Builder
{}.build(c
, y
);
200 auto pool
= create_pool(r
, get_temp_pool_name(), y
);
201 auto sg
= make_scope_guard(
203 r
.delete_pool(pool
, y
);
205 R::IOContext
ioc(pool
);
207 const std::uint64_t max_part_size
= 10 * 1024;
208 const std::uint64_t max_entry_size
= 128;
209 auto oid_prefix
= "foo.123."sv
;
211 objv
.instance
= "fooz"s
;
214 /* first successful create */
215 auto f
= RCf::FIFO::create(r
, ioc
, fifo_id
, y
, objv
, oid_prefix
,
216 false, max_part_size
,
220 /* force reading from backend */
222 auto info
= f
->meta();
223 ASSERT_EQ(info
.id
, fifo_id
);
224 ASSERT_EQ(info
.params
.max_part_size
, max_part_size
);
225 ASSERT_EQ(info
.params
.max_entry_size
, max_entry_size
);
226 ASSERT_EQ(info
.version
, objv
);
233 std::pair
<T
, std::string
> decode_entry(const RCf::list_entry
& entry
)
236 auto iter
= entry
.data
.cbegin();
238 return std::make_pair(std::move(val
), entry
.marker
);
244 TEST(FIFO
, TestPushListTrim
) {
246 auto fifo_id
= "fifo"sv
;
248 s::spawn(c
, [&](s::yield_context y
) mutable {
249 auto r
= R::RADOS::Builder
{}.build(c
, y
);
250 auto pool
= create_pool(r
, get_temp_pool_name(), y
);
251 auto sg
= make_scope_guard(
253 r
.delete_pool(pool
, y
);
255 R::IOContext
ioc(pool
);
256 auto f
= RCf::FIFO::create(r
, ioc
, fifo_id
, y
);
257 static constexpr auto max_entries
= 10u;
258 for (uint32_t i
= 0; i
< max_entries
; ++i
) {
264 std::optional
<std::string
> marker
;
265 /* get entries one by one */
267 for (auto i
= 0u; i
< max_entries
; ++i
) {
268 auto [result
, more
] = f
->list(1, marker
, y
);
270 bool expected_more
= (i
!= (max_entries
- 1));
271 ASSERT_EQ(expected_more
, more
);
272 ASSERT_EQ(1, result
.size());
275 std::tie(val
, marker
) =
276 decode_entry
<std::uint32_t>(result
.front());
281 /* get all entries at once */
282 std::string markers
[max_entries
];
283 std::uint32_t min_entry
= 0;
285 auto [result
, more
] = f
->list(max_entries
* 10, std::nullopt
,
289 ASSERT_EQ(max_entries
, result
.size());
292 for (auto i
= 0u; i
< max_entries
; ++i
) {
295 std::tie(val
, markers
[i
]) =
296 decode_entry
<std::uint32_t>(result
[i
]);
302 f
->trim(markers
[min_entry
], false, y
);
306 auto [result
, more
] = f
->list(max_entries
* 10,
310 ASSERT_EQ(max_entries
- min_entry
, result
.size());
312 for (auto i
= min_entry
; i
< max_entries
; ++i
) {
315 std::tie(val
, markers
[i
- min_entry
]) =
316 decode_entry
<std::uint32_t>(result
[i
- min_entry
]);
325 TEST(FIFO
, TestPushTooBig
) {
327 auto fifo_id
= "fifo"sv
;
328 static constexpr auto max_part_size
= 2048ull;
329 static constexpr auto max_entry_size
= 128ull;
331 s::spawn(c
, [&](s::yield_context y
) {
332 auto r
= R::RADOS::Builder
{}.build(c
, y
);
333 auto pool
= create_pool(r
, get_temp_pool_name(), y
);
334 auto sg
= make_scope_guard(
336 r
.delete_pool(pool
, y
);
338 R::IOContext
ioc(pool
);
340 auto f
= RCf::FIFO::create(r
, ioc
, fifo_id
, y
, std::nullopt
,
341 std::nullopt
, false, max_part_size
,
344 char buf
[max_entry_size
+ 1];
345 memset(buf
, 0, sizeof(buf
));
348 bl
.append(buf
, sizeof(buf
));
352 EXPECT_EQ(RCf::errc::entry_too_large
, ec
);
358 TEST(FIFO
, TestMultipleParts
) {
360 auto fifo_id
= "fifo"sv
;
361 static constexpr auto max_part_size
= 2048ull;
362 static constexpr auto max_entry_size
= 128ull;
364 s::spawn(c
, [&](s::yield_context y
) mutable {
365 auto r
= R::RADOS::Builder
{}.build(c
, y
);
366 auto pool
= create_pool(r
, get_temp_pool_name(), y
);
367 auto sg
= make_scope_guard(
369 r
.delete_pool(pool
, y
);
371 R::IOContext
ioc(pool
);
373 auto f
= RCf::FIFO::create(r
, ioc
, fifo_id
, y
, std::nullopt
,
374 std::nullopt
, false, max_part_size
,
378 char buf
[max_entry_size
];
379 memset(buf
, 0, sizeof(buf
));
381 const auto [part_header_size
, part_entry_overhead
] =
382 f
->get_part_layout_info();
384 const auto entries_per_part
=
385 (max_part_size
- part_header_size
) /
386 (max_entry_size
+ part_entry_overhead
);
388 const auto max_entries
= entries_per_part
* 4 + 1;
390 /* push enough entries */
391 for (auto i
= 0u; i
< max_entries
; ++i
) {
395 bl
.append(buf
, sizeof(buf
));
400 auto info
= f
->meta();
402 ASSERT_EQ(info
.id
, fifo_id
);
403 /* head should have advanced */
404 ASSERT_GT(info
.head_part_num
, 0);
407 /* list all at once */
408 auto [result
, more
] = f
->list(max_entries
, std::nullopt
, y
);
409 EXPECT_EQ(false, more
);
411 ASSERT_EQ(max_entries
, result
.size());
413 for (auto i
= 0u; i
< max_entries
; ++i
) {
414 auto& bl
= result
[i
].data
;
415 ASSERT_EQ(i
, *(int *)bl
.c_str());
418 std::optional
<std::string
> marker
;
419 /* get entries one by one */
421 for (auto i
= 0u; i
< max_entries
; ++i
) {
422 auto [result
, more
] = f
->list(1, marker
, y
);
423 ASSERT_EQ(result
.size(), 1);
424 const bool expected_more
= (i
!= (max_entries
- 1));
425 ASSERT_EQ(expected_more
, more
);
428 std::tie(val
, marker
) =
429 decode_entry
<std::uint32_t>(result
.front());
431 auto& entry
= result
.front();
432 auto& bl
= entry
.data
;
433 ASSERT_EQ(i
, *(int *)bl
.c_str());
434 marker
= entry
.marker
;
437 /* trim one at a time */
439 for (auto i
= 0u; i
< max_entries
; ++i
) {
440 /* read single entry */
442 auto [result
, more
] = f
->list(1, marker
, y
);
443 ASSERT_EQ(result
.size(), 1);
444 const bool expected_more
= (i
!= (max_entries
- 1));
445 ASSERT_EQ(expected_more
, more
);
447 marker
= result
.front().marker
;
449 f
->trim(*marker
, false, y
);
454 ASSERT_EQ(info
.tail_part_num
, i
/ entries_per_part
);
456 /* try to read all again, see how many entries left */
457 auto [result
, more
] = f
->list(max_entries
, marker
, y
);
458 ASSERT_EQ(max_entries
- i
- 1, result
.size());
459 ASSERT_EQ(false, more
);
462 /* tail now should point at head */
464 ASSERT_EQ(info
.head_part_num
, info
.tail_part_num
);
466 /* check old tails are removed */
467 for (auto i
= 0; i
< info
.tail_part_num
; ++i
) {
469 f
->get_part_info(i
, y
[ec
]);
470 ASSERT_EQ(bs::errc::no_such_file_or_directory
, ec
);
472 /* check current tail exists */
473 f
->get_part_info(info
.tail_part_num
, y
);
479 TEST(FIFO
, TestTwoPushers
) {
481 auto fifo_id
= "fifo"sv
;
482 static constexpr auto max_part_size
= 2048ull;
483 static constexpr auto max_entry_size
= 128ull;
485 s::spawn(c
, [&](s::yield_context y
) {
486 auto r
= R::RADOS::Builder
{}.build(c
, y
);
487 auto pool
= create_pool(r
, get_temp_pool_name(), y
);
488 auto sg
= make_scope_guard(
490 r
.delete_pool(pool
, y
);
492 R::IOContext
ioc(pool
);
494 auto f
= RCf::FIFO::create(r
, ioc
, fifo_id
, y
, std::nullopt
,
495 std::nullopt
, false, max_part_size
,
500 char buf
[max_entry_size
];
501 memset(buf
, 0, sizeof(buf
));
504 auto [part_header_size
, part_entry_overhead
] =
505 f
->get_part_layout_info();
507 const auto entries_per_part
=
508 (max_part_size
- part_header_size
) /
509 (max_entry_size
+ part_entry_overhead
);
511 const auto max_entries
= entries_per_part
* 4 + 1;
513 auto f2
= RCf::FIFO::open(r
, ioc
, fifo_id
, y
);
515 std::vector fifos
{&f
, &f2
};
517 for (auto i
= 0u; i
< max_entries
; ++i
) {
520 bl
.append(buf
, sizeof(buf
));
522 auto& f
= fifos
[i
% fifos
.size()];
527 /* list all by both */
529 auto [result
, more
] = f2
->list(max_entries
, std::nullopt
, y
);
531 ASSERT_EQ(false, more
);
532 ASSERT_EQ(max_entries
, result
.size());
534 auto [result
, more
] = f2
->list(max_entries
, std::nullopt
, y
);
535 ASSERT_EQ(false, more
);
536 ASSERT_EQ(max_entries
, result
.size());
538 for (auto i
= 0u; i
< max_entries
; ++i
) {
539 auto& bl
= result
[i
].data
;
540 ASSERT_EQ(i
, *(int *)bl
.c_str());
547 TEST(FIFO
, TestTwoPushersTrim
) {
549 auto fifo_id
= "fifo"sv
;
550 static constexpr auto max_part_size
= 2048ull;
551 static constexpr auto max_entry_size
= 128ull;
553 s::spawn(c
, [&](s::yield_context y
) {
554 auto r
= R::RADOS::Builder
{}.build(c
, y
);
555 auto pool
= create_pool(r
, get_temp_pool_name(), y
);
556 auto sg
= make_scope_guard(
558 r
.delete_pool(pool
, y
);
560 R::IOContext
ioc(pool
);
562 auto f1
= RCf::FIFO::create(r
, ioc
, fifo_id
, y
, std::nullopt
,
563 std::nullopt
, false, max_part_size
,
566 char buf
[max_entry_size
];
567 memset(buf
, 0, sizeof(buf
));
570 auto [part_header_size
, part_entry_overhead
] =
571 f1
->get_part_layout_info();
573 const auto entries_per_part
=
574 (max_part_size
- part_header_size
) /
575 (max_entry_size
+ part_entry_overhead
);
577 const auto max_entries
= entries_per_part
* 4 + 1;
579 auto f2
= RCf::FIFO::open(r
, ioc
, fifo_id
, y
);
581 /* push one entry to f2 and the rest to f1 */
583 for (auto i
= 0u; i
< max_entries
; ++i
) {
587 bl
.append(buf
, sizeof(buf
));
589 auto f
= (i
< 1 ? &f2
: &f1
);
593 /* trim half by fifo1 */
594 auto num
= max_entries
/ 2;
598 auto [result
, more
] = f1
->list(num
, std::nullopt
, y
);
600 ASSERT_EQ(true, more
);
601 ASSERT_EQ(num
, result
.size());
603 for (auto i
= 0u; i
< num
; ++i
) {
604 auto& bl
= result
[i
].data
;
605 ASSERT_EQ(i
, *(int *)bl
.c_str());
608 auto& entry
= result
[num
- 1];
609 marker
= entry
.marker
;
611 f1
->trim(marker
, false, y
);
613 /* list what's left by fifo2 */
617 const auto left
= max_entries
- num
;
618 auto [result
, more
] = f2
->list(left
, marker
, y
);
619 ASSERT_EQ(left
, result
.size());
620 ASSERT_EQ(false, more
);
622 for (auto i
= num
; i
< max_entries
; ++i
) {
623 auto& bl
= result
[i
- num
].data
;
624 ASSERT_EQ(i
, *(int *)bl
.c_str());
630 TEST(FIFO
, TestPushBatch
) {
632 auto fifo_id
= "fifo"sv
;
633 static constexpr auto max_part_size
= 2048ull;
634 static constexpr auto max_entry_size
= 128ull;
636 s::spawn(c
, [&](s::yield_context y
) {
637 auto r
= R::RADOS::Builder
{}.build(c
, y
);
638 auto pool
= create_pool(r
, get_temp_pool_name(), y
);
639 auto sg
= make_scope_guard(
641 r
.delete_pool(pool
, y
);
643 R::IOContext
ioc(pool
);
645 auto f
= RCf::FIFO::create(r
, ioc
, fifo_id
, y
, std::nullopt
,
646 std::nullopt
, false, max_part_size
,
650 char buf
[max_entry_size
];
651 memset(buf
, 0, sizeof(buf
));
653 auto [part_header_size
, part_entry_overhead
]
654 = f
->get_part_layout_info();
656 auto entries_per_part
=
657 (max_part_size
- part_header_size
) /
658 (max_entry_size
+ part_entry_overhead
);
660 auto max_entries
= entries_per_part
* 4 + 1; /* enough entries to span multiple parts */
662 std::vector
<cb::list
> bufs
;
664 for (auto i
= 0u; i
< max_entries
; ++i
) {
668 bl
.append(buf
, sizeof(buf
));
677 auto [result
, more
] = f
->list(max_entries
, std::nullopt
, y
);
678 ASSERT_EQ(false, more
);
679 ASSERT_EQ(max_entries
, result
.size());
681 for (auto i
= 0u; i
< max_entries
; ++i
) {
682 auto& bl
= result
[i
].data
;
683 ASSERT_EQ(i
, *(int *)bl
.c_str());
686 auto& info
= f
->meta();
687 ASSERT_EQ(info
.head_part_num
, 4);
692 TEST(FIFO
, TestTrimExclusive
) {
694 auto fifo_id
= "fifo"sv
;
696 s::spawn(c
, [&](s::yield_context y
) mutable {
697 auto r
= R::RADOS::Builder
{}.build(c
, y
);
698 auto pool
= create_pool(r
, get_temp_pool_name(), y
);
699 auto sg
= make_scope_guard(
701 r
.delete_pool(pool
, y
);
703 R::IOContext
ioc(pool
);
704 auto f
= RCf::FIFO::create(r
, ioc
, fifo_id
, y
);
705 static constexpr auto max_entries
= 10u;
706 for (uint32_t i
= 0; i
< max_entries
; ++i
) {
713 auto [result
, more
] = f
->list(1, std::nullopt
, y
);
715 decode_entry
<std::uint32_t>(result
.front());
717 f
->trim(marker
, true, y
);
720 auto [result
, more
] = f
->list(max_entries
, std::nullopt
, y
);
721 auto [val
, marker
] = decode_entry
<std::uint32_t>(result
.front());
723 f
->trim(result
[4].marker
, true, y
);
726 auto [result
, more
] = f
->list(max_entries
, std::nullopt
, y
);
728 decode_entry
<std::uint32_t>(result
.front());
730 f
->trim(result
.back().marker
, true, y
);
733 auto [result
, more
] = f
->list(max_entries
, std::nullopt
, y
);
735 decode_entry
<std::uint32_t>(result
.front());
736 ASSERT_EQ(result
.size(), 1);
737 ASSERT_EQ(max_entries
- 1, val
);