]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/test_cls_fifo_legacy.cc
26d9e9a9253e4e67f069481d75b3c8c49fac7ed0
[ceph.git] / ceph / src / test / rgw / test_cls_fifo_legacy.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2019 Red Hat, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <cerrno>
16 #include <iostream>
17 #include <string_view>
18
19 #include "include/scope_guard.h"
20 #include "include/types.h"
21 #include "include/rados/librados.hpp"
22
23 #include "cls/fifo/cls_fifo_ops.h"
24 #include "test/librados/test_cxx.h"
25 #include "global/global_context.h"
26
27 #include "rgw/rgw_tools.h"
28 #include "rgw/cls_fifo_legacy.h"
29
30 #include "gtest/gtest.h"
31
32 namespace R = librados;
33 namespace cb = ceph::buffer;
34 namespace fifo = rados::cls::fifo;
35 namespace RCf = rgw::cls::fifo;
36
37 namespace {
38 int fifo_create(R::IoCtx& ioctx,
39 const std::string& oid,
40 std::string_view id,
41 optional_yield y,
42 std::optional<fifo::objv> objv = std::nullopt,
43 std::optional<std::string_view> oid_prefix = std::nullopt,
44 bool exclusive = false,
45 std::uint64_t max_part_size = RCf::default_max_part_size,
46 std::uint64_t max_entry_size = RCf::default_max_entry_size)
47 {
48 R::ObjectWriteOperation op;
49 RCf::create_meta(&op, id, objv, oid_prefix, exclusive, max_part_size,
50 max_entry_size);
51 return rgw_rados_operate(ioctx, oid, &op, y);
52 }
53 }
54
55 class LegacyFIFO : public testing::Test {
56 protected:
57 const std::string pool_name = get_temp_pool_name();
58 const std::string fifo_id = "fifo";
59 R::Rados rados;
60 librados::IoCtx ioctx;
61
62 void SetUp() override {
63 ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
64 ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
65 }
66 void TearDown() override {
67 destroy_one_pool_pp(pool_name, rados);
68 }
69 };
70
71 using LegacyClsFIFO = LegacyFIFO;
72 using AioLegacyFIFO = LegacyFIFO;
73
74
75 TEST_F(LegacyClsFIFO, TestCreate)
76 {
77 auto r = fifo_create(ioctx, fifo_id, ""s, null_yield);
78 EXPECT_EQ(-EINVAL, r);
79 r = fifo_create(ioctx, fifo_id, fifo_id, null_yield, std::nullopt,
80 std::nullopt, false, 0);
81 EXPECT_EQ(-EINVAL, r);
82 r = fifo_create(ioctx, fifo_id, {}, null_yield,
83 std::nullopt, std::nullopt,
84 false, RCf::default_max_part_size, 0);
85 EXPECT_EQ(-EINVAL, r);
86 r = fifo_create(ioctx, fifo_id, fifo_id, null_yield);
87 EXPECT_EQ(0, r);
88 std::uint64_t size;
89 ioctx.stat(fifo_id, &size, nullptr);
90 EXPECT_GT(size, 0);
91 /* test idempotency */
92 r = fifo_create(ioctx, fifo_id, fifo_id, null_yield);
93 EXPECT_EQ(0, r);
94 r = fifo_create(ioctx, fifo_id, {}, null_yield, std::nullopt,
95 std::nullopt, false);
96 EXPECT_EQ(-EINVAL, r);
97 r = fifo_create(ioctx, fifo_id, {}, null_yield, std::nullopt,
98 "myprefix"sv, false);
99 EXPECT_EQ(-EINVAL, r);
100 r = fifo_create(ioctx, fifo_id, "foo"sv, null_yield,
101 std::nullopt, std::nullopt, false);
102 EXPECT_EQ(-EEXIST, r);
103 }
104
105 TEST_F(LegacyClsFIFO, TestGetInfo)
106 {
107 auto r = fifo_create(ioctx, fifo_id, fifo_id, null_yield);
108 fifo::info info;
109 std::uint32_t part_header_size;
110 std::uint32_t part_entry_overhead;
111 r = RCf::get_meta(ioctx, fifo_id, std::nullopt, &info, &part_header_size,
112 &part_entry_overhead, 0, null_yield);
113 EXPECT_EQ(0, r);
114 EXPECT_GT(part_header_size, 0);
115 EXPECT_GT(part_entry_overhead, 0);
116 EXPECT_FALSE(info.version.instance.empty());
117
118 r = RCf::get_meta(ioctx, fifo_id, info.version, &info, &part_header_size,
119 &part_entry_overhead, 0, null_yield);
120 EXPECT_EQ(0, r);
121 fifo::objv objv;
122 objv.instance = "foo";
123 objv.ver = 12;
124 r = RCf::get_meta(ioctx, fifo_id, objv, &info, &part_header_size,
125 &part_entry_overhead, 0, null_yield);
126 EXPECT_EQ(-ECANCELED, r);
127 }
128
129 TEST_F(LegacyFIFO, TestOpenDefault)
130 {
131 std::unique_ptr<RCf::FIFO> fifo;
132 auto r = RCf::FIFO::create(ioctx, fifo_id, &fifo, null_yield);
133 ASSERT_EQ(0, r);
134 // force reading from backend
135 r = fifo->read_meta(null_yield);
136 EXPECT_EQ(0, r);
137 auto info = fifo->meta();
138 EXPECT_EQ(info.id, fifo_id);
139 }
140
141 TEST_F(LegacyFIFO, TestOpenParams)
142 {
143 const std::uint64_t max_part_size = 10 * 1024;
144 const std::uint64_t max_entry_size = 128;
145 auto oid_prefix = "foo.123."sv;
146 fifo::objv objv;
147 objv.instance = "fooz"s;
148 objv.ver = 10;
149
150 /* first successful create */
151 std::unique_ptr<RCf::FIFO> f;
152 auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, objv, oid_prefix,
153 false, max_part_size, max_entry_size);
154 ASSERT_EQ(0, r);
155
156 /* force reading from backend */
157 r = f->read_meta(null_yield);
158 auto info = f->meta();
159 EXPECT_EQ(info.id, fifo_id);
160 EXPECT_EQ(info.params.max_part_size, max_part_size);
161 EXPECT_EQ(info.params.max_entry_size, max_entry_size);
162 EXPECT_EQ(info.version, objv);
163 }
164
165 namespace {
166 template<class T>
167 std::pair<T, std::string> decode_entry(const RCf::list_entry& entry)
168 {
169 T val;
170 auto iter = entry.data.cbegin();
171 decode(val, iter);
172 return std::make_pair(std::move(val), entry.marker);
173 }
174 }
175
176
177 TEST_F(LegacyFIFO, TestPushListTrim)
178 {
179 std::unique_ptr<RCf::FIFO> f;
180 auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield);
181 ASSERT_EQ(0, r);
182 static constexpr auto max_entries = 10u;
183 for (uint32_t i = 0; i < max_entries; ++i) {
184 cb::list bl;
185 encode(i, bl);
186 r = f->push(bl, null_yield);
187 ASSERT_EQ(0, r);
188 }
189
190 std::optional<std::string> marker;
191 /* get entries one by one */
192 std::vector<RCf::list_entry> result;
193 bool more = false;
194 for (auto i = 0u; i < max_entries; ++i) {
195
196 r = f->list(1, marker, &result, &more, null_yield);
197 ASSERT_EQ(0, r);
198
199 bool expected_more = (i != (max_entries - 1));
200 ASSERT_EQ(expected_more, more);
201 ASSERT_EQ(1, result.size());
202
203 std::uint32_t val;
204 std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
205
206 ASSERT_EQ(i, val);
207 result.clear();
208 }
209
210 /* get all entries at once */
211 std::string markers[max_entries];
212 std::uint32_t min_entry = 0;
213 r = f->list(max_entries * 10, std::nullopt, &result, &more, null_yield);
214 ASSERT_EQ(0, r);
215
216 ASSERT_FALSE(more);
217 ASSERT_EQ(max_entries, result.size());
218 for (auto i = 0u; i < max_entries; ++i) {
219 std::uint32_t val;
220 std::tie(val, markers[i]) = decode_entry<std::uint32_t>(result[i]);
221 ASSERT_EQ(i, val);
222 }
223
224 /* trim one entry */
225 r = f->trim(markers[min_entry], false, null_yield);
226 ASSERT_EQ(0, r);
227 ++min_entry;
228
229 r = f->list(max_entries * 10, std::nullopt, &result, &more, null_yield);
230 ASSERT_EQ(0, r);
231 ASSERT_FALSE(more);
232 ASSERT_EQ(max_entries - min_entry, result.size());
233
234 for (auto i = min_entry; i < max_entries; ++i) {
235 std::uint32_t val;
236 std::tie(val, markers[i - min_entry]) =
237 decode_entry<std::uint32_t>(result[i - min_entry]);
238 EXPECT_EQ(i, val);
239 }
240 }
241
242
243 TEST_F(LegacyFIFO, TestPushTooBig)
244 {
245 static constexpr auto max_part_size = 2048ull;
246 static constexpr auto max_entry_size = 128ull;
247
248 std::unique_ptr<RCf::FIFO> f;
249 auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt,
250 std::nullopt, false, max_part_size, max_entry_size);
251 ASSERT_EQ(0, r);
252
253 char buf[max_entry_size + 1];
254 memset(buf, 0, sizeof(buf));
255
256 cb::list bl;
257 bl.append(buf, sizeof(buf));
258
259 r = f->push(bl, null_yield);
260 EXPECT_EQ(-E2BIG, r);
261 }
262
263
264 TEST_F(LegacyFIFO, TestMultipleParts)
265 {
266 static constexpr auto max_part_size = 2048ull;
267 static constexpr auto max_entry_size = 128ull;
268 std::unique_ptr<RCf::FIFO> f;
269 auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt,
270 std::nullopt, false, max_part_size,
271 max_entry_size);
272 ASSERT_EQ(0, r);
273
274 char buf[max_entry_size];
275 memset(buf, 0, sizeof(buf));
276 const auto [part_header_size, part_entry_overhead] =
277 f->get_part_layout_info();
278 const auto entries_per_part = ((max_part_size - part_header_size) /
279 (max_entry_size + part_entry_overhead));
280 const auto max_entries = entries_per_part * 4 + 1;
281 /* push enough entries */
282 for (auto i = 0u; i < max_entries; ++i) {
283 cb::list bl;
284 *(int *)buf = i;
285 bl.append(buf, sizeof(buf));
286 r = f->push(bl, null_yield);
287 ASSERT_EQ(0, r);
288 }
289
290 auto info = f->meta();
291 ASSERT_EQ(info.id, fifo_id);
292 /* head should have advanced */
293 ASSERT_GT(info.head_part_num, 0);
294
295 /* list all at once */
296 std::vector<RCf::list_entry> result;
297 bool more = false;
298 r = f->list(max_entries, std::nullopt, &result, &more, null_yield);
299 ASSERT_EQ(0, r);
300 EXPECT_EQ(false, more);
301 ASSERT_EQ(max_entries, result.size());
302
303 for (auto i = 0u; i < max_entries; ++i) {
304 auto& bl = result[i].data;
305 ASSERT_EQ(i, *(int *)bl.c_str());
306 }
307
308 std::optional<std::string> marker;
309 /* get entries one by one */
310
311 for (auto i = 0u; i < max_entries; ++i) {
312 r = f->list(1, marker, &result, &more, null_yield);
313 ASSERT_EQ(0, r);
314 ASSERT_EQ(result.size(), 1);
315 const bool expected_more = (i != (max_entries - 1));
316 ASSERT_EQ(expected_more, more);
317
318 std::uint32_t val;
319 std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
320
321 auto& entry = result.front();
322 auto& bl = entry.data;
323 ASSERT_EQ(i, *(int *)bl.c_str());
324 marker = entry.marker;
325 }
326
327 /* trim one at a time */
328 marker.reset();
329 for (auto i = 0u; i < max_entries; ++i) {
330 /* read single entry */
331 r = f->list(1, marker, &result, &more, null_yield);
332 ASSERT_EQ(0, r);
333 ASSERT_EQ(result.size(), 1);
334 const bool expected_more = (i != (max_entries - 1));
335 ASSERT_EQ(expected_more, more);
336
337 marker = result.front().marker;
338 r = f->trim(*marker, false, null_yield);
339 ASSERT_EQ(0, r);
340
341 /* check tail */
342 info = f->meta();
343 ASSERT_EQ(info.tail_part_num, i / entries_per_part);
344
345 /* try to read all again, see how many entries left */
346 r = f->list(max_entries, marker, &result, &more, null_yield);
347 ASSERT_EQ(max_entries - i - 1, result.size());
348 ASSERT_EQ(false, more);
349 }
350
351 /* tail now should point at head */
352 info = f->meta();
353 ASSERT_EQ(info.head_part_num, info.tail_part_num);
354
355 RCf::part_info partinfo;
356 /* check old tails are removed */
357 for (auto i = 0; i < info.tail_part_num; ++i) {
358 r = f->get_part_info(i, &partinfo, null_yield);
359 ASSERT_EQ(-ENOENT, r);
360 }
361 /* check current tail exists */
362 r = f->get_part_info(info.tail_part_num, &partinfo, null_yield);
363 ASSERT_EQ(0, r);
364 }
365
366 TEST_F(LegacyFIFO, TestTwoPushers)
367 {
368 static constexpr auto max_part_size = 2048ull;
369 static constexpr auto max_entry_size = 128ull;
370
371 std::unique_ptr<RCf::FIFO> f;
372 auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt,
373 std::nullopt, false, max_part_size,
374 max_entry_size);
375 ASSERT_EQ(0, r);
376 char buf[max_entry_size];
377 memset(buf, 0, sizeof(buf));
378
379 auto [part_header_size, part_entry_overhead] = f->get_part_layout_info();
380 const auto entries_per_part = ((max_part_size - part_header_size) /
381 (max_entry_size + part_entry_overhead));
382 const auto max_entries = entries_per_part * 4 + 1;
383 std::unique_ptr<RCf::FIFO> f2;
384 r = RCf::FIFO::open(ioctx, fifo_id, &f2, null_yield);
385 std::vector fifos{&f, &f2};
386
387 for (auto i = 0u; i < max_entries; ++i) {
388 cb::list bl;
389 *(int *)buf = i;
390 bl.append(buf, sizeof(buf));
391 auto& f = *fifos[i % fifos.size()];
392 r = f->push(bl, null_yield);
393 ASSERT_EQ(0, r);
394 }
395
396 /* list all by both */
397 std::vector<RCf::list_entry> result;
398 bool more = false;
399 r = f2->list(max_entries, std::nullopt, &result, &more, null_yield);
400 ASSERT_EQ(0, r);
401 ASSERT_EQ(false, more);
402 ASSERT_EQ(max_entries, result.size());
403
404 r = f2->list(max_entries, std::nullopt, &result, &more, null_yield);
405 ASSERT_EQ(0, r);
406 ASSERT_EQ(false, more);
407 ASSERT_EQ(max_entries, result.size());
408
409 for (auto i = 0u; i < max_entries; ++i) {
410 auto& bl = result[i].data;
411 ASSERT_EQ(i, *(int *)bl.c_str());
412 }
413 }
414
415 TEST_F(LegacyFIFO, TestTwoPushersTrim)
416 {
417 static constexpr auto max_part_size = 2048ull;
418 static constexpr auto max_entry_size = 128ull;
419 std::unique_ptr<RCf::FIFO> f1;
420 auto r = RCf::FIFO::create(ioctx, fifo_id, &f1, null_yield, std::nullopt,
421 std::nullopt, false, max_part_size,
422 max_entry_size);
423 ASSERT_EQ(0, r);
424
425 char buf[max_entry_size];
426 memset(buf, 0, sizeof(buf));
427
428 auto [part_header_size, part_entry_overhead] = f1->get_part_layout_info();
429 const auto entries_per_part = ((max_part_size - part_header_size) /
430 (max_entry_size + part_entry_overhead));
431 const auto max_entries = entries_per_part * 4 + 1;
432
433 std::unique_ptr<RCf::FIFO> f2;
434 r = RCf::FIFO::open(ioctx, fifo_id, &f2, null_yield);
435 ASSERT_EQ(0, r);
436
437 /* push one entry to f2 and the rest to f1 */
438 for (auto i = 0u; i < max_entries; ++i) {
439 cb::list bl;
440 *(int *)buf = i;
441 bl.append(buf, sizeof(buf));
442 auto& f = (i < 1 ? f2 : f1);
443 r = f->push(bl, null_yield);
444 ASSERT_EQ(0, r);
445 }
446
447 /* trim half by fifo1 */
448 auto num = max_entries / 2;
449 std::string marker;
450 std::vector<RCf::list_entry> result;
451 bool more = false;
452 r = f1->list(num, std::nullopt, &result, &more, null_yield);
453 ASSERT_EQ(0, r);
454 ASSERT_EQ(true, more);
455 ASSERT_EQ(num, result.size());
456
457 for (auto i = 0u; i < num; ++i) {
458 auto& bl = result[i].data;
459 ASSERT_EQ(i, *(int *)bl.c_str());
460 }
461
462 auto& entry = result[num - 1];
463 marker = entry.marker;
464 r = f1->trim(marker, false, null_yield);
465 /* list what's left by fifo2 */
466
467 const auto left = max_entries - num;
468 f2->list(left, marker, &result, &more, null_yield);
469 ASSERT_EQ(left, result.size());
470 ASSERT_EQ(false, more);
471
472 for (auto i = num; i < max_entries; ++i) {
473 auto& bl = result[i - num].data;
474 ASSERT_EQ(i, *(int *)bl.c_str());
475 }
476 }
477
478 TEST_F(LegacyFIFO, TestPushBatch)
479 {
480 static constexpr auto max_part_size = 2048ull;
481 static constexpr auto max_entry_size = 128ull;
482
483 std::unique_ptr<RCf::FIFO> f;
484 auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt,
485 std::nullopt, false, max_part_size,
486 max_entry_size);
487 ASSERT_EQ(0, r);
488
489 char buf[max_entry_size];
490 memset(buf, 0, sizeof(buf));
491 auto [part_header_size, part_entry_overhead] = f->get_part_layout_info();
492 auto entries_per_part = ((max_part_size - part_header_size) /
493 (max_entry_size + part_entry_overhead));
494 auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */
495 std::vector<cb::list> bufs;
496 for (auto i = 0u; i < max_entries; ++i) {
497 cb::list bl;
498 *(int *)buf = i;
499 bl.append(buf, sizeof(buf));
500 bufs.push_back(bl);
501 }
502 ASSERT_EQ(max_entries, bufs.size());
503
504 r = f->push(bufs, null_yield);
505 ASSERT_EQ(0, r);
506
507 /* list all */
508
509 std::vector<RCf::list_entry> result;
510 bool more = false;
511 r = f->list(max_entries, std::nullopt, &result, &more, null_yield);
512 ASSERT_EQ(0, r);
513 ASSERT_EQ(false, more);
514 ASSERT_EQ(max_entries, result.size());
515 for (auto i = 0u; i < max_entries; ++i) {
516 auto& bl = result[i].data;
517 ASSERT_EQ(i, *(int *)bl.c_str());
518 }
519 auto& info = f->meta();
520 ASSERT_EQ(info.head_part_num, 4);
521 }
522
523 TEST_F(LegacyFIFO, TestAioTrim)
524 {
525 static constexpr auto max_part_size = 2048ull;
526 static constexpr auto max_entry_size = 128ull;
527 std::unique_ptr<RCf::FIFO> f;
528 auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt,
529 std::nullopt, false, max_part_size,
530 max_entry_size);
531 ASSERT_EQ(0, r);
532
533 char buf[max_entry_size];
534 memset(buf, 0, sizeof(buf));
535 const auto [part_header_size, part_entry_overhead] =
536 f->get_part_layout_info();
537 const auto entries_per_part = ((max_part_size - part_header_size) /
538 (max_entry_size + part_entry_overhead));
539 const auto max_entries = entries_per_part * 4 + 1;
540 /* push enough entries */
541 std::vector<cb::list> bufs;
542 for (auto i = 0u; i < max_entries; ++i) {
543 cb::list bl;
544 *(int *)buf = i;
545 bl.append(buf, sizeof(buf));
546 bufs.push_back(std::move(bl));
547 }
548 ASSERT_EQ(max_entries, bufs.size());
549
550 r = f->push(bufs, null_yield);
551 ASSERT_EQ(0, r);
552
553 auto info = f->meta();
554 ASSERT_EQ(info.id, fifo_id);
555 /* head should have advanced */
556 ASSERT_GT(info.head_part_num, 0);
557
558 /* list all at once */
559 std::vector<RCf::list_entry> result;
560 bool more = false;
561 r = f->list(max_entries, std::nullopt, &result, &more, null_yield);
562 ASSERT_EQ(0, r);
563 ASSERT_EQ(false, more);
564 ASSERT_EQ(max_entries, result.size());
565
566 std::optional<std::string> marker;
567 /* trim one at a time */
568 result.clear();
569 more = false;
570 marker.reset();
571 for (auto i = 0u; i < max_entries; ++i) {
572 /* read single entry */
573 r = f->list(1, marker, &result, &more, null_yield);
574 ASSERT_EQ(0, r);
575 ASSERT_EQ(result.size(), 1);
576 const bool expected_more = (i != (max_entries - 1));
577 ASSERT_EQ(expected_more, more);
578
579 marker = result.front().marker;
580 std::unique_ptr<R::AioCompletion> c(rados.aio_create_completion(nullptr,
581 nullptr));
582 f->trim(*marker, false, c.get());
583 c->wait_for_complete();
584 r = c->get_return_value();
585 ASSERT_EQ(0, r);
586
587 /* check tail */
588 info = f->meta();
589 ASSERT_EQ(info.tail_part_num, i / entries_per_part);
590
591 /* try to read all again, see how many entries left */
592 r = f->list(max_entries, marker, &result, &more, null_yield);
593 ASSERT_EQ(max_entries - i - 1, result.size());
594 ASSERT_EQ(false, more);
595 }
596
597 /* tail now should point at head */
598 info = f->meta();
599 ASSERT_EQ(info.head_part_num, info.tail_part_num);
600
601 RCf::part_info partinfo;
602 /* check old tails are removed */
603 for (auto i = 0; i < info.tail_part_num; ++i) {
604 r = f->get_part_info(i, &partinfo, null_yield);
605 ASSERT_EQ(-ENOENT, r);
606 }
607 /* check current tail exists */
608 r = f->get_part_info(info.tail_part_num, &partinfo, null_yield);
609 ASSERT_EQ(0, r);
610 }
611
612 TEST_F(LegacyFIFO, TestTrimExclusive) {
613 std::unique_ptr<RCf::FIFO> f;
614 auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield);
615 ASSERT_EQ(0, r);
616 std::vector<RCf::list_entry> result;
617 bool more = false;
618
619 static constexpr auto max_entries = 10u;
620 for (uint32_t i = 0; i < max_entries; ++i) {
621 cb::list bl;
622 encode(i, bl);
623 f->push(bl, null_yield);
624 }
625
626 f->list(1, std::nullopt, &result, &more, null_yield);
627 auto [val, marker] = decode_entry<std::uint32_t>(result.front());
628 ASSERT_EQ(0, val);
629 f->trim(marker, true, null_yield);
630
631 result.clear();
632 f->list(max_entries, std::nullopt, &result, &more, null_yield);
633 std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
634 ASSERT_EQ(0, val);
635 f->trim(result[4].marker, true, null_yield);
636
637 result.clear();
638 f->list(max_entries, std::nullopt, &result, &more, null_yield);
639 std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
640 ASSERT_EQ(4, val);
641 f->trim(result.back().marker, true, null_yield);
642
643 result.clear();
644 f->list(max_entries, std::nullopt, &result, &more, null_yield);
645 std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
646 ASSERT_EQ(result.size(), 1);
647 ASSERT_EQ(max_entries - 1, val);
648 }
649
650 TEST_F(AioLegacyFIFO, TestPushListTrim)
651 {
652 std::unique_ptr<RCf::FIFO> f;
653 auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield);
654 ASSERT_EQ(0, r);
655 static constexpr auto max_entries = 10u;
656 for (uint32_t i = 0; i < max_entries; ++i) {
657 cb::list bl;
658 encode(i, bl);
659 auto c = R::Rados::aio_create_completion();
660 f->push(bl, c);
661 c->wait_for_complete();
662 r = c->get_return_value();
663 c->release();
664 ASSERT_EQ(0, r);
665 }
666
667 std::optional<std::string> marker;
668 /* get entries one by one */
669 std::vector<RCf::list_entry> result;
670 bool more = false;
671 for (auto i = 0u; i < max_entries; ++i) {
672 auto c = R::Rados::aio_create_completion();
673 f->list(1, marker, &result, &more, c);
674 c->wait_for_complete();
675 r = c->get_return_value();
676 c->release();
677 ASSERT_EQ(0, r);
678
679 bool expected_more = (i != (max_entries - 1));
680 ASSERT_EQ(expected_more, more);
681 ASSERT_EQ(1, result.size());
682
683 std::uint32_t val;
684 std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
685
686 ASSERT_EQ(i, val);
687 result.clear();
688 }
689
690 /* get all entries at once */
691 std::string markers[max_entries];
692 std::uint32_t min_entry = 0;
693 auto c = R::Rados::aio_create_completion();
694 f->list(max_entries * 10, std::nullopt, &result, &more, c);
695 c->wait_for_complete();
696 r = c->get_return_value();
697 c->release();
698 ASSERT_EQ(0, r);
699
700 ASSERT_FALSE(more);
701 ASSERT_EQ(max_entries, result.size());
702 for (auto i = 0u; i < max_entries; ++i) {
703 std::uint32_t val;
704 std::tie(val, markers[i]) = decode_entry<std::uint32_t>(result[i]);
705 ASSERT_EQ(i, val);
706 }
707
708 /* trim one entry */
709 c = R::Rados::aio_create_completion();
710 f->trim(markers[min_entry], false, c);
711 c->wait_for_complete();
712 r = c->get_return_value();
713 c->release();
714 ASSERT_EQ(0, r);
715 ++min_entry;
716
717 c = R::Rados::aio_create_completion();
718 f->list(max_entries * 10, std::nullopt, &result, &more, c);
719 c->wait_for_complete();
720 r = c->get_return_value();
721 c->release();
722 ASSERT_EQ(0, r);
723 ASSERT_FALSE(more);
724 ASSERT_EQ(max_entries - min_entry, result.size());
725
726 for (auto i = min_entry; i < max_entries; ++i) {
727 std::uint32_t val;
728 std::tie(val, markers[i - min_entry]) =
729 decode_entry<std::uint32_t>(result[i - min_entry]);
730 EXPECT_EQ(i, val);
731 }
732 }
733
734
735 TEST_F(AioLegacyFIFO, TestPushTooBig)
736 {
737 static constexpr auto max_part_size = 2048ull;
738 static constexpr auto max_entry_size = 128ull;
739
740 std::unique_ptr<RCf::FIFO> f;
741 auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt,
742 std::nullopt, false, max_part_size, max_entry_size);
743 ASSERT_EQ(0, r);
744
745 char buf[max_entry_size + 1];
746 memset(buf, 0, sizeof(buf));
747
748 cb::list bl;
749 bl.append(buf, sizeof(buf));
750
751 auto c = R::Rados::aio_create_completion();
752 f->push(bl, c);
753 c->wait_for_complete();
754 r = c->get_return_value();
755 ASSERT_EQ(-E2BIG, r);
756 c->release();
757
758 c = R::Rados::aio_create_completion();
759 f->push(std::vector<cb::list>{}, c);
760 c->wait_for_complete();
761 r = c->get_return_value();
762 c->release();
763 EXPECT_EQ(0, r);
764 }
765
766
767 TEST_F(AioLegacyFIFO, TestMultipleParts)
768 {
769 static constexpr auto max_part_size = 2048ull;
770 static constexpr auto max_entry_size = 128ull;
771 std::unique_ptr<RCf::FIFO> f;
772 auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt,
773 std::nullopt, false, max_part_size,
774 max_entry_size);
775 ASSERT_EQ(0, r);
776
777 {
778 auto c = R::Rados::aio_create_completion();
779 f->get_head_info([&](int r, RCf::part_info&& p) {
780 ASSERT_TRUE(p.tag.empty());
781 ASSERT_EQ(0, p.magic);
782 ASSERT_EQ(0, p.min_ofs);
783 ASSERT_EQ(0, p.last_ofs);
784 ASSERT_EQ(0, p.next_ofs);
785 ASSERT_EQ(0, p.min_index);
786 ASSERT_EQ(0, p.max_index);
787 ASSERT_EQ(ceph::real_time{}, p.max_time);
788 }, c);
789 c->wait_for_complete();
790 r = c->get_return_value();
791 c->release();
792 }
793
794 char buf[max_entry_size];
795 memset(buf, 0, sizeof(buf));
796 const auto [part_header_size, part_entry_overhead] =
797 f->get_part_layout_info();
798 const auto entries_per_part = ((max_part_size - part_header_size) /
799 (max_entry_size + part_entry_overhead));
800 const auto max_entries = entries_per_part * 4 + 1;
801 /* push enough entries */
802 for (auto i = 0u; i < max_entries; ++i) {
803 cb::list bl;
804 *(int *)buf = i;
805 bl.append(buf, sizeof(buf));
806 auto c = R::Rados::aio_create_completion();
807 f->push(bl, c);
808 c->wait_for_complete();
809 r = c->get_return_value();
810 c->release();
811 EXPECT_EQ(0, r);
812 }
813
814 auto info = f->meta();
815 ASSERT_EQ(info.id, fifo_id);
816 /* head should have advanced */
817 ASSERT_GT(info.head_part_num, 0);
818
819 /* list all at once */
820 std::vector<RCf::list_entry> result;
821 bool more = false;
822 auto c = R::Rados::aio_create_completion();
823 f->list(max_entries, std::nullopt, &result, &more, c);
824 c->wait_for_complete();
825 r = c->get_return_value();
826 c->release();
827 EXPECT_EQ(0, r);
828 EXPECT_EQ(false, more);
829 ASSERT_EQ(max_entries, result.size());
830
831 for (auto i = 0u; i < max_entries; ++i) {
832 auto& bl = result[i].data;
833 ASSERT_EQ(i, *(int *)bl.c_str());
834 }
835
836 std::optional<std::string> marker;
837 /* get entries one by one */
838
839 for (auto i = 0u; i < max_entries; ++i) {
840 c = R::Rados::aio_create_completion();
841 f->list(1, marker, &result, &more, c);
842 c->wait_for_complete();
843 r = c->get_return_value();
844 c->release();
845 EXPECT_EQ(0, r);
846 ASSERT_EQ(result.size(), 1);
847 const bool expected_more = (i != (max_entries - 1));
848 ASSERT_EQ(expected_more, more);
849
850 std::uint32_t val;
851 std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
852
853 auto& entry = result.front();
854 auto& bl = entry.data;
855 ASSERT_EQ(i, *(int *)bl.c_str());
856 marker = entry.marker;
857 }
858
859 /* trim one at a time */
860 marker.reset();
861 for (auto i = 0u; i < max_entries; ++i) {
862 /* read single entry */
863 c = R::Rados::aio_create_completion();
864 f->list(1, marker, &result, &more, c);
865 c->wait_for_complete();
866 r = c->get_return_value();
867 c->release();
868 EXPECT_EQ(0, r);
869 ASSERT_EQ(result.size(), 1);
870 const bool expected_more = (i != (max_entries - 1));
871 ASSERT_EQ(expected_more, more);
872
873 marker = result.front().marker;
874 c = R::Rados::aio_create_completion();
875 f->trim(*marker, false, c);
876 c->wait_for_complete();
877 r = c->get_return_value();
878 c->release();
879 EXPECT_EQ(0, r);
880 ASSERT_EQ(result.size(), 1);
881
882 /* check tail */
883 info = f->meta();
884 ASSERT_EQ(info.tail_part_num, i / entries_per_part);
885
886 /* try to read all again, see how many entries left */
887 c = R::Rados::aio_create_completion();
888 f->list(max_entries, marker, &result, &more, c);
889 c->wait_for_complete();
890 r = c->get_return_value();
891 c->release();
892 EXPECT_EQ(0, r);
893 ASSERT_EQ(max_entries - i - 1, result.size());
894 ASSERT_EQ(false, more);
895 }
896
897 /* tail now should point at head */
898 info = f->meta();
899 ASSERT_EQ(info.head_part_num, info.tail_part_num);
900
901 /* check old tails are removed */
902 for (auto i = 0; i < info.tail_part_num; ++i) {
903 c = R::Rados::aio_create_completion();
904 RCf::part_info partinfo;
905 f->get_part_info(i, &partinfo, c);
906 c->wait_for_complete();
907 r = c->get_return_value();
908 c->release();
909 ASSERT_EQ(-ENOENT, r);
910 }
911 /* check current tail exists */
912 std::uint64_t next_ofs;
913 {
914 c = R::Rados::aio_create_completion();
915 RCf::part_info partinfo;
916 f->get_part_info(info.tail_part_num, &partinfo, c);
917 c->wait_for_complete();
918 r = c->get_return_value();
919 c->release();
920 next_ofs = partinfo.next_ofs;
921 }
922 ASSERT_EQ(0, r);
923
924 c = R::Rados::aio_create_completion();
925 f->get_head_info([&](int r, RCf::part_info&& p) {
926 ASSERT_EQ(next_ofs, p.next_ofs);
927 }, c);
928 c->wait_for_complete();
929 r = c->get_return_value();
930 c->release();
931 ASSERT_EQ(0, r);
932 }
933
934 TEST_F(AioLegacyFIFO, TestTwoPushers)
935 {
936 static constexpr auto max_part_size = 2048ull;
937 static constexpr auto max_entry_size = 128ull;
938
939 std::unique_ptr<RCf::FIFO> f;
940 auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt,
941 std::nullopt, false, max_part_size,
942 max_entry_size);
943 ASSERT_EQ(0, r);
944 char buf[max_entry_size];
945 memset(buf, 0, sizeof(buf));
946
947 auto [part_header_size, part_entry_overhead] = f->get_part_layout_info();
948 const auto entries_per_part = ((max_part_size - part_header_size) /
949 (max_entry_size + part_entry_overhead));
950 const auto max_entries = entries_per_part * 4 + 1;
951 std::unique_ptr<RCf::FIFO> f2;
952 r = RCf::FIFO::open(ioctx, fifo_id, &f2, null_yield);
953 std::vector fifos{&f, &f2};
954
955 for (auto i = 0u; i < max_entries; ++i) {
956 cb::list bl;
957 *(int *)buf = i;
958 bl.append(buf, sizeof(buf));
959 auto& f = *fifos[i % fifos.size()];
960 auto c = R::Rados::aio_create_completion();
961 f->push(bl, c);
962 c->wait_for_complete();
963 r = c->get_return_value();
964 c->release();
965 ASSERT_EQ(0, r);
966 }
967
968 /* list all by both */
969 std::vector<RCf::list_entry> result;
970 bool more = false;
971 auto c = R::Rados::aio_create_completion();
972 f2->list(max_entries, std::nullopt, &result, &more, c);
973 c->wait_for_complete();
974 r = c->get_return_value();
975 c->release();
976 ASSERT_EQ(0, r);
977 ASSERT_EQ(false, more);
978 ASSERT_EQ(max_entries, result.size());
979
980 c = R::Rados::aio_create_completion();
981 f2->list(max_entries, std::nullopt, &result, &more, c);
982 c->wait_for_complete();
983 r = c->get_return_value();
984 c->release();
985 ASSERT_EQ(0, r);
986 ASSERT_EQ(false, more);
987 ASSERT_EQ(max_entries, result.size());
988
989 for (auto i = 0u; i < max_entries; ++i) {
990 auto& bl = result[i].data;
991 ASSERT_EQ(i, *(int *)bl.c_str());
992 }
993 }
994
995 TEST_F(AioLegacyFIFO, TestTwoPushersTrim)
996 {
997 static constexpr auto max_part_size = 2048ull;
998 static constexpr auto max_entry_size = 128ull;
999 std::unique_ptr<RCf::FIFO> f1;
1000 auto r = RCf::FIFO::create(ioctx, fifo_id, &f1, null_yield, std::nullopt,
1001 std::nullopt, false, max_part_size,
1002 max_entry_size);
1003 ASSERT_EQ(0, r);
1004
1005 char buf[max_entry_size];
1006 memset(buf, 0, sizeof(buf));
1007
1008 auto [part_header_size, part_entry_overhead] = f1->get_part_layout_info();
1009 const auto entries_per_part = ((max_part_size - part_header_size) /
1010 (max_entry_size + part_entry_overhead));
1011 const auto max_entries = entries_per_part * 4 + 1;
1012
1013 std::unique_ptr<RCf::FIFO> f2;
1014 r = RCf::FIFO::open(ioctx, fifo_id, &f2, null_yield);
1015 ASSERT_EQ(0, r);
1016
1017 /* push one entry to f2 and the rest to f1 */
1018 for (auto i = 0u; i < max_entries; ++i) {
1019 cb::list bl;
1020 *(int *)buf = i;
1021 bl.append(buf, sizeof(buf));
1022 auto& f = (i < 1 ? f2 : f1);
1023 auto c = R::Rados::aio_create_completion();
1024 f->push(bl, c);
1025 c->wait_for_complete();
1026 r = c->get_return_value();
1027 c->release();
1028 ASSERT_EQ(0, r);
1029 }
1030
1031 /* trim half by fifo1 */
1032 auto num = max_entries / 2;
1033 std::string marker;
1034 std::vector<RCf::list_entry> result;
1035 bool more = false;
1036 auto c = R::Rados::aio_create_completion();
1037 f1->list(num, std::nullopt, &result, &more, c);
1038 c->wait_for_complete();
1039 r = c->get_return_value();
1040 c->release();
1041 ASSERT_EQ(0, r);
1042 ASSERT_EQ(true, more);
1043 ASSERT_EQ(num, result.size());
1044
1045 for (auto i = 0u; i < num; ++i) {
1046 auto& bl = result[i].data;
1047 ASSERT_EQ(i, *(int *)bl.c_str());
1048 }
1049
1050 auto& entry = result[num - 1];
1051 marker = entry.marker;
1052 c = R::Rados::aio_create_completion();
1053 f1->trim(marker, false, c);
1054 c->wait_for_complete();
1055 r = c->get_return_value();
1056 c->release();
1057 ASSERT_EQ(0, r);
1058 /* list what's left by fifo2 */
1059
1060 const auto left = max_entries - num;
1061 c = R::Rados::aio_create_completion();
1062 f2->list(left, marker, &result, &more, c);
1063 c->wait_for_complete();
1064 r = c->get_return_value();
1065 c->release();
1066 ASSERT_EQ(0, r);
1067 ASSERT_EQ(left, result.size());
1068 ASSERT_EQ(false, more);
1069
1070 for (auto i = num; i < max_entries; ++i) {
1071 auto& bl = result[i - num].data;
1072 ASSERT_EQ(i, *(int *)bl.c_str());
1073 }
1074 }
1075
1076 TEST_F(AioLegacyFIFO, TestPushBatch)
1077 {
1078 static constexpr auto max_part_size = 2048ull;
1079 static constexpr auto max_entry_size = 128ull;
1080
1081 std::unique_ptr<RCf::FIFO> f;
1082 auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt,
1083 std::nullopt, false, max_part_size,
1084 max_entry_size);
1085 ASSERT_EQ(0, r);
1086
1087 char buf[max_entry_size];
1088 memset(buf, 0, sizeof(buf));
1089 auto [part_header_size, part_entry_overhead] = f->get_part_layout_info();
1090 auto entries_per_part = ((max_part_size - part_header_size) /
1091 (max_entry_size + part_entry_overhead));
1092 auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */
1093 std::vector<cb::list> bufs;
1094 for (auto i = 0u; i < max_entries; ++i) {
1095 cb::list bl;
1096 *(int *)buf = i;
1097 bl.append(buf, sizeof(buf));
1098 bufs.push_back(bl);
1099 }
1100 ASSERT_EQ(max_entries, bufs.size());
1101
1102 auto c = R::Rados::aio_create_completion();
1103 f->push(bufs, c);
1104 c->wait_for_complete();
1105 r = c->get_return_value();
1106 c->release();
1107 ASSERT_EQ(0, r);
1108
1109 /* list all */
1110
1111 std::vector<RCf::list_entry> result;
1112 bool more = false;
1113 c = R::Rados::aio_create_completion();
1114 f->list(max_entries, std::nullopt, &result, &more, c);
1115 c->wait_for_complete();
1116 r = c->get_return_value();
1117 c->release();
1118 ASSERT_EQ(0, r);
1119 ASSERT_EQ(false, more);
1120 ASSERT_EQ(max_entries, result.size());
1121 for (auto i = 0u; i < max_entries; ++i) {
1122 auto& bl = result[i].data;
1123 ASSERT_EQ(i, *(int *)bl.c_str());
1124 }
1125 auto& info = f->meta();
1126 ASSERT_EQ(info.head_part_num, 4);
1127 }
1128
1129 TEST_F(LegacyFIFO, TrimAll)
1130 {
1131 std::unique_ptr<RCf::FIFO> f;
1132 auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield);
1133 ASSERT_EQ(0, r);
1134 static constexpr auto max_entries = 10u;
1135 for (uint32_t i = 0; i < max_entries; ++i) {
1136 cb::list bl;
1137 encode(i, bl);
1138 r = f->push(bl, null_yield);
1139 ASSERT_EQ(0, r);
1140 }
1141
1142 /* trim one entry */
1143 r = f->trim(RCf::marker::max().to_string(), false, null_yield);
1144 ASSERT_EQ(-ENODATA, r);
1145
1146 std::vector<RCf::list_entry> result;
1147 bool more;
1148 r = f->list(1, std::nullopt, &result, &more, null_yield);
1149 ASSERT_EQ(0, r);
1150 ASSERT_TRUE(result.empty());
1151 }
1152
1153 TEST_F(LegacyFIFO, AioTrimAll)
1154 {
1155 std::unique_ptr<RCf::FIFO> f;
1156 auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield);
1157 ASSERT_EQ(0, r);
1158 static constexpr auto max_entries = 10u;
1159 for (uint32_t i = 0; i < max_entries; ++i) {
1160 cb::list bl;
1161 encode(i, bl);
1162 r = f->push(bl, null_yield);
1163 ASSERT_EQ(0, r);
1164 }
1165
1166 auto c = R::Rados::aio_create_completion();
1167 f->trim(RCf::marker::max().to_string(), false, c);
1168 c->wait_for_complete();
1169 r = c->get_return_value();
1170 c->release();
1171 ASSERT_EQ(-ENODATA, r);
1172
1173 std::vector<RCf::list_entry> result;
1174 bool more;
1175 r = f->list(1, std::nullopt, &result, &more, null_yield);
1176 ASSERT_EQ(0, r);
1177 ASSERT_TRUE(result.empty());
1178 }