]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/test_cls_fifo_legacy.cc
e4c4375c4de337d3ea497037722616d25c9cbc47
[ceph.git] / ceph / src / test / rgw / test_cls_fifo_legacy.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2019 Red Hat, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <cerrno>
16 #include <iostream>
17 #include <string_view>
18
19 #include "include/scope_guard.h"
20 #include "include/types.h"
21 #include "include/rados/librados.hpp"
22 #include "common/ceph_context.h"
23
24 #include "cls/fifo/cls_fifo_ops.h"
25 #include "test/librados/test_cxx.h"
26 #include "global/global_context.h"
27
28 #include "rgw/rgw_tools.h"
29 #include "rgw/cls_fifo_legacy.h"
30
31 #include "gtest/gtest.h"
32
33 using namespace std::literals;
34 using namespace std::string_literals;
35
36 namespace R = librados;
37 namespace cb = ceph::buffer;
38 namespace fifo = rados::cls::fifo;
39 namespace RCf = rgw::cls::fifo;
40
41 auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT);
42 const DoutPrefix dp(cct, 1, "test legacy cls fifo: ");
43
44 namespace {
45 int fifo_create(const DoutPrefixProvider *dpp, R::IoCtx& ioctx,
46 const std::string& oid,
47 std::string_view id,
48 optional_yield y,
49 std::optional<fifo::objv> objv = std::nullopt,
50 std::optional<std::string_view> oid_prefix = std::nullopt,
51 bool exclusive = false,
52 std::uint64_t max_part_size = RCf::default_max_part_size,
53 std::uint64_t max_entry_size = RCf::default_max_entry_size)
54 {
55 R::ObjectWriteOperation op;
56 RCf::create_meta(&op, id, objv, oid_prefix, exclusive, max_part_size,
57 max_entry_size);
58 return rgw_rados_operate(dpp, ioctx, oid, &op, y);
59 }
60 }
61
62 class LegacyFIFO : public testing::Test {
63 protected:
64 const std::string pool_name = get_temp_pool_name();
65 const std::string fifo_id = "fifo";
66 R::Rados rados;
67 librados::IoCtx ioctx;
68
69 void SetUp() override {
70 ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
71 ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
72 }
73 void TearDown() override {
74 destroy_one_pool_pp(pool_name, rados);
75 }
76 };
77
78 using LegacyClsFIFO = LegacyFIFO;
79 using AioLegacyFIFO = LegacyFIFO;
80
81
82 TEST_F(LegacyClsFIFO, TestCreate)
83 {
84 auto r = fifo_create(&dp, ioctx, fifo_id, ""s, null_yield);
85 EXPECT_EQ(-EINVAL, r);
86 r = fifo_create(&dp, ioctx, fifo_id, fifo_id, null_yield, std::nullopt,
87 std::nullopt, false, 0);
88 EXPECT_EQ(-EINVAL, r);
89 r = fifo_create(&dp, ioctx, fifo_id, {}, null_yield,
90 std::nullopt, std::nullopt,
91 false, RCf::default_max_part_size, 0);
92 EXPECT_EQ(-EINVAL, r);
93 r = fifo_create(&dp, ioctx, fifo_id, fifo_id, null_yield);
94 EXPECT_EQ(0, r);
95 std::uint64_t size;
96 ioctx.stat(fifo_id, &size, nullptr);
97 EXPECT_GT(size, 0);
98 /* test idempotency */
99 r = fifo_create(&dp, ioctx, fifo_id, fifo_id, null_yield);
100 EXPECT_EQ(0, r);
101 r = fifo_create(&dp, ioctx, fifo_id, {}, null_yield, std::nullopt,
102 std::nullopt, false);
103 EXPECT_EQ(-EINVAL, r);
104 r = fifo_create(&dp, ioctx, fifo_id, {}, null_yield, std::nullopt,
105 "myprefix"sv, false);
106 EXPECT_EQ(-EINVAL, r);
107 r = fifo_create(&dp, ioctx, fifo_id, "foo"sv, null_yield,
108 std::nullopt, std::nullopt, false);
109 EXPECT_EQ(-EEXIST, r);
110 }
111
112 TEST_F(LegacyClsFIFO, TestGetInfo)
113 {
114 auto r = fifo_create(&dp, ioctx, fifo_id, fifo_id, null_yield);
115 fifo::info info;
116 std::uint32_t part_header_size;
117 std::uint32_t part_entry_overhead;
118 r = RCf::get_meta(&dp, ioctx, fifo_id, std::nullopt, &info, &part_header_size,
119 &part_entry_overhead, 0, null_yield);
120 EXPECT_EQ(0, r);
121 EXPECT_GT(part_header_size, 0);
122 EXPECT_GT(part_entry_overhead, 0);
123 EXPECT_FALSE(info.version.instance.empty());
124
125 r = RCf::get_meta(&dp, ioctx, fifo_id, info.version, &info, &part_header_size,
126 &part_entry_overhead, 0, null_yield);
127 EXPECT_EQ(0, r);
128 fifo::objv objv;
129 objv.instance = "foo";
130 objv.ver = 12;
131 r = RCf::get_meta(&dp, ioctx, fifo_id, objv, &info, &part_header_size,
132 &part_entry_overhead, 0, null_yield);
133 EXPECT_EQ(-ECANCELED, r);
134 }
135
136 TEST_F(LegacyFIFO, TestOpenDefault)
137 {
138 std::unique_ptr<RCf::FIFO> fifo;
139 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &fifo, null_yield);
140 ASSERT_EQ(0, r);
141 // force reading from backend
142 r = fifo->read_meta(&dp, null_yield);
143 EXPECT_EQ(0, r);
144 auto info = fifo->meta();
145 EXPECT_EQ(info.id, fifo_id);
146 }
147
148 TEST_F(LegacyFIFO, TestOpenParams)
149 {
150 const std::uint64_t max_part_size = 10 * 1024;
151 const std::uint64_t max_entry_size = 128;
152 auto oid_prefix = "foo.123."sv;
153 fifo::objv objv;
154 objv.instance = "fooz"s;
155 objv.ver = 10;
156
157 /* first successful create */
158 std::unique_ptr<RCf::FIFO> f;
159 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, objv, oid_prefix,
160 false, max_part_size, max_entry_size);
161 ASSERT_EQ(0, r);
162
163 /* force reading from backend */
164 r = f->read_meta(&dp, null_yield);
165 auto info = f->meta();
166 EXPECT_EQ(info.id, fifo_id);
167 EXPECT_EQ(info.params.max_part_size, max_part_size);
168 EXPECT_EQ(info.params.max_entry_size, max_entry_size);
169 EXPECT_EQ(info.version, objv);
170 }
171
172 namespace {
173 template<class T>
174 std::pair<T, std::string> decode_entry(const RCf::list_entry& entry)
175 {
176 T val;
177 auto iter = entry.data.cbegin();
178 decode(val, iter);
179 return std::make_pair(std::move(val), entry.marker);
180 }
181 }
182
183
184 TEST_F(LegacyFIFO, TestPushListTrim)
185 {
186 std::unique_ptr<RCf::FIFO> f;
187 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield);
188 ASSERT_EQ(0, r);
189 static constexpr auto max_entries = 10u;
190 for (uint32_t i = 0; i < max_entries; ++i) {
191 cb::list bl;
192 encode(i, bl);
193 r = f->push(&dp, bl, null_yield);
194 ASSERT_EQ(0, r);
195 }
196
197 std::optional<std::string> marker;
198 /* get entries one by one */
199 std::vector<RCf::list_entry> result;
200 bool more = false;
201 for (auto i = 0u; i < max_entries; ++i) {
202
203 r = f->list(&dp, 1, marker, &result, &more, null_yield);
204 ASSERT_EQ(0, r);
205
206 bool expected_more = (i != (max_entries - 1));
207 ASSERT_EQ(expected_more, more);
208 ASSERT_EQ(1, result.size());
209
210 std::uint32_t val;
211 std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
212
213 ASSERT_EQ(i, val);
214 result.clear();
215 }
216
217 /* get all entries at once */
218 std::string markers[max_entries];
219 std::uint32_t min_entry = 0;
220 r = f->list(&dp, max_entries * 10, std::nullopt, &result, &more, null_yield);
221 ASSERT_EQ(0, r);
222
223 ASSERT_FALSE(more);
224 ASSERT_EQ(max_entries, result.size());
225 for (auto i = 0u; i < max_entries; ++i) {
226 std::uint32_t val;
227 std::tie(val, markers[i]) = decode_entry<std::uint32_t>(result[i]);
228 ASSERT_EQ(i, val);
229 }
230
231 /* trim one entry */
232 r = f->trim(&dp, markers[min_entry], false, null_yield);
233 ASSERT_EQ(0, r);
234 ++min_entry;
235
236 r = f->list(&dp, max_entries * 10, std::nullopt, &result, &more, null_yield);
237 ASSERT_EQ(0, r);
238 ASSERT_FALSE(more);
239 ASSERT_EQ(max_entries - min_entry, result.size());
240
241 for (auto i = min_entry; i < max_entries; ++i) {
242 std::uint32_t val;
243 std::tie(val, markers[i - min_entry]) =
244 decode_entry<std::uint32_t>(result[i - min_entry]);
245 EXPECT_EQ(i, val);
246 }
247 }
248
249
250 TEST_F(LegacyFIFO, TestPushTooBig)
251 {
252 static constexpr auto max_part_size = 2048ull;
253 static constexpr auto max_entry_size = 128ull;
254
255 std::unique_ptr<RCf::FIFO> f;
256 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt,
257 std::nullopt, false, max_part_size, max_entry_size);
258 ASSERT_EQ(0, r);
259
260 char buf[max_entry_size + 1];
261 memset(buf, 0, sizeof(buf));
262
263 cb::list bl;
264 bl.append(buf, sizeof(buf));
265
266 r = f->push(&dp, bl, null_yield);
267 EXPECT_EQ(-E2BIG, r);
268 }
269
270
271 TEST_F(LegacyFIFO, TestMultipleParts)
272 {
273 static constexpr auto max_part_size = 2048ull;
274 static constexpr auto max_entry_size = 128ull;
275 std::unique_ptr<RCf::FIFO> f;
276 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt,
277 std::nullopt, false, max_part_size,
278 max_entry_size);
279 ASSERT_EQ(0, r);
280
281 char buf[max_entry_size];
282 memset(buf, 0, sizeof(buf));
283 const auto [part_header_size, part_entry_overhead] =
284 f->get_part_layout_info();
285 const auto entries_per_part = ((max_part_size - part_header_size) /
286 (max_entry_size + part_entry_overhead));
287 const auto max_entries = entries_per_part * 4 + 1;
288 /* push enough entries */
289 for (auto i = 0u; i < max_entries; ++i) {
290 cb::list bl;
291 *(int *)buf = i;
292 bl.append(buf, sizeof(buf));
293 r = f->push(&dp, bl, null_yield);
294 ASSERT_EQ(0, r);
295 }
296
297 auto info = f->meta();
298 ASSERT_EQ(info.id, fifo_id);
299 /* head should have advanced */
300 ASSERT_GT(info.head_part_num, 0);
301
302 /* list all at once */
303 std::vector<RCf::list_entry> result;
304 bool more = false;
305 r = f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield);
306 ASSERT_EQ(0, r);
307 EXPECT_EQ(false, more);
308 ASSERT_EQ(max_entries, result.size());
309
310 for (auto i = 0u; i < max_entries; ++i) {
311 auto& bl = result[i].data;
312 ASSERT_EQ(i, *(int *)bl.c_str());
313 }
314
315 std::optional<std::string> marker;
316 /* get entries one by one */
317
318 for (auto i = 0u; i < max_entries; ++i) {
319 r = f->list(&dp, 1, marker, &result, &more, null_yield);
320 ASSERT_EQ(0, r);
321 ASSERT_EQ(result.size(), 1);
322 const bool expected_more = (i != (max_entries - 1));
323 ASSERT_EQ(expected_more, more);
324
325 std::uint32_t val;
326 std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
327
328 auto& entry = result.front();
329 auto& bl = entry.data;
330 ASSERT_EQ(i, *(int *)bl.c_str());
331 marker = entry.marker;
332 }
333
334 /* trim one at a time */
335 marker.reset();
336 for (auto i = 0u; i < max_entries; ++i) {
337 /* read single entry */
338 r = f->list(&dp, 1, marker, &result, &more, null_yield);
339 ASSERT_EQ(0, r);
340 ASSERT_EQ(result.size(), 1);
341 const bool expected_more = (i != (max_entries - 1));
342 ASSERT_EQ(expected_more, more);
343
344 marker = result.front().marker;
345 r = f->trim(&dp, *marker, false, null_yield);
346 ASSERT_EQ(0, r);
347
348 /* check tail */
349 info = f->meta();
350 ASSERT_EQ(info.tail_part_num, i / entries_per_part);
351
352 /* try to read all again, see how many entries left */
353 r = f->list(&dp, max_entries, marker, &result, &more, null_yield);
354 ASSERT_EQ(max_entries - i - 1, result.size());
355 ASSERT_EQ(false, more);
356 }
357
358 /* tail now should point at head */
359 info = f->meta();
360 ASSERT_EQ(info.head_part_num, info.tail_part_num);
361
362 RCf::part_info partinfo;
363 /* check old tails are removed */
364 for (auto i = 0; i < info.tail_part_num; ++i) {
365 r = f->get_part_info(&dp, i, &partinfo, null_yield);
366 ASSERT_EQ(-ENOENT, r);
367 }
368 /* check current tail exists */
369 r = f->get_part_info(&dp, info.tail_part_num, &partinfo, null_yield);
370 ASSERT_EQ(0, r);
371 }
372
373 TEST_F(LegacyFIFO, TestTwoPushers)
374 {
375 static constexpr auto max_part_size = 2048ull;
376 static constexpr auto max_entry_size = 128ull;
377
378 std::unique_ptr<RCf::FIFO> f;
379 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt,
380 std::nullopt, false, max_part_size,
381 max_entry_size);
382 ASSERT_EQ(0, r);
383 char buf[max_entry_size];
384 memset(buf, 0, sizeof(buf));
385
386 auto [part_header_size, part_entry_overhead] = f->get_part_layout_info();
387 const auto entries_per_part = ((max_part_size - part_header_size) /
388 (max_entry_size + part_entry_overhead));
389 const auto max_entries = entries_per_part * 4 + 1;
390 std::unique_ptr<RCf::FIFO> f2;
391 r = RCf::FIFO::open(&dp, ioctx, fifo_id, &f2, null_yield);
392 std::vector fifos{&f, &f2};
393
394 for (auto i = 0u; i < max_entries; ++i) {
395 cb::list bl;
396 *(int *)buf = i;
397 bl.append(buf, sizeof(buf));
398 auto& f = *fifos[i % fifos.size()];
399 r = f->push(&dp, bl, null_yield);
400 ASSERT_EQ(0, r);
401 }
402
403 /* list all by both */
404 std::vector<RCf::list_entry> result;
405 bool more = false;
406 r = f2->list(&dp, max_entries, std::nullopt, &result, &more, null_yield);
407 ASSERT_EQ(0, r);
408 ASSERT_EQ(false, more);
409 ASSERT_EQ(max_entries, result.size());
410
411 r = f2->list(&dp, max_entries, std::nullopt, &result, &more, null_yield);
412 ASSERT_EQ(0, r);
413 ASSERT_EQ(false, more);
414 ASSERT_EQ(max_entries, result.size());
415
416 for (auto i = 0u; i < max_entries; ++i) {
417 auto& bl = result[i].data;
418 ASSERT_EQ(i, *(int *)bl.c_str());
419 }
420 }
421
422 TEST_F(LegacyFIFO, TestTwoPushersTrim)
423 {
424 static constexpr auto max_part_size = 2048ull;
425 static constexpr auto max_entry_size = 128ull;
426 std::unique_ptr<RCf::FIFO> f1;
427 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f1, null_yield, std::nullopt,
428 std::nullopt, false, max_part_size,
429 max_entry_size);
430 ASSERT_EQ(0, r);
431
432 char buf[max_entry_size];
433 memset(buf, 0, sizeof(buf));
434
435 auto [part_header_size, part_entry_overhead] = f1->get_part_layout_info();
436 const auto entries_per_part = ((max_part_size - part_header_size) /
437 (max_entry_size + part_entry_overhead));
438 const auto max_entries = entries_per_part * 4 + 1;
439
440 std::unique_ptr<RCf::FIFO> f2;
441 r = RCf::FIFO::open(&dp, ioctx, fifo_id, &f2, null_yield);
442 ASSERT_EQ(0, r);
443
444 /* push one entry to f2 and the rest to f1 */
445 for (auto i = 0u; i < max_entries; ++i) {
446 cb::list bl;
447 *(int *)buf = i;
448 bl.append(buf, sizeof(buf));
449 auto& f = (i < 1 ? f2 : f1);
450 r = f->push(&dp, bl, null_yield);
451 ASSERT_EQ(0, r);
452 }
453
454 /* trim half by fifo1 */
455 auto num = max_entries / 2;
456 std::string marker;
457 std::vector<RCf::list_entry> result;
458 bool more = false;
459 r = f1->list(&dp, num, std::nullopt, &result, &more, null_yield);
460 ASSERT_EQ(0, r);
461 ASSERT_EQ(true, more);
462 ASSERT_EQ(num, result.size());
463
464 for (auto i = 0u; i < num; ++i) {
465 auto& bl = result[i].data;
466 ASSERT_EQ(i, *(int *)bl.c_str());
467 }
468
469 auto& entry = result[num - 1];
470 marker = entry.marker;
471 r = f1->trim(&dp, marker, false, null_yield);
472 /* list what's left by fifo2 */
473
474 const auto left = max_entries - num;
475 f2->list(&dp, left, marker, &result, &more, null_yield);
476 ASSERT_EQ(left, result.size());
477 ASSERT_EQ(false, more);
478
479 for (auto i = num; i < max_entries; ++i) {
480 auto& bl = result[i - num].data;
481 ASSERT_EQ(i, *(int *)bl.c_str());
482 }
483 }
484
485 TEST_F(LegacyFIFO, TestPushBatch)
486 {
487 static constexpr auto max_part_size = 2048ull;
488 static constexpr auto max_entry_size = 128ull;
489
490 std::unique_ptr<RCf::FIFO> f;
491 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt,
492 std::nullopt, false, max_part_size,
493 max_entry_size);
494 ASSERT_EQ(0, r);
495
496 char buf[max_entry_size];
497 memset(buf, 0, sizeof(buf));
498 auto [part_header_size, part_entry_overhead] = f->get_part_layout_info();
499 auto entries_per_part = ((max_part_size - part_header_size) /
500 (max_entry_size + part_entry_overhead));
501 auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */
502 std::vector<cb::list> bufs;
503 for (auto i = 0u; i < max_entries; ++i) {
504 cb::list bl;
505 *(int *)buf = i;
506 bl.append(buf, sizeof(buf));
507 bufs.push_back(bl);
508 }
509 ASSERT_EQ(max_entries, bufs.size());
510
511 r = f->push(&dp, bufs, null_yield);
512 ASSERT_EQ(0, r);
513
514 /* list all */
515
516 std::vector<RCf::list_entry> result;
517 bool more = false;
518 r = f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield);
519 ASSERT_EQ(0, r);
520 ASSERT_EQ(false, more);
521 ASSERT_EQ(max_entries, result.size());
522 for (auto i = 0u; i < max_entries; ++i) {
523 auto& bl = result[i].data;
524 ASSERT_EQ(i, *(int *)bl.c_str());
525 }
526 auto& info = f->meta();
527 ASSERT_EQ(info.head_part_num, 4);
528 }
529
530 TEST_F(LegacyFIFO, TestAioTrim)
531 {
532 static constexpr auto max_part_size = 2048ull;
533 static constexpr auto max_entry_size = 128ull;
534 std::unique_ptr<RCf::FIFO> f;
535 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt,
536 std::nullopt, false, max_part_size,
537 max_entry_size);
538 ASSERT_EQ(0, r);
539
540 char buf[max_entry_size];
541 memset(buf, 0, sizeof(buf));
542 const auto [part_header_size, part_entry_overhead] =
543 f->get_part_layout_info();
544 const auto entries_per_part = ((max_part_size - part_header_size) /
545 (max_entry_size + part_entry_overhead));
546 const auto max_entries = entries_per_part * 4 + 1;
547 /* push enough entries */
548 std::vector<cb::list> bufs;
549 for (auto i = 0u; i < max_entries; ++i) {
550 cb::list bl;
551 *(int *)buf = i;
552 bl.append(buf, sizeof(buf));
553 bufs.push_back(std::move(bl));
554 }
555 ASSERT_EQ(max_entries, bufs.size());
556
557 r = f->push(&dp, bufs, null_yield);
558 ASSERT_EQ(0, r);
559
560 auto info = f->meta();
561 ASSERT_EQ(info.id, fifo_id);
562 /* head should have advanced */
563 ASSERT_GT(info.head_part_num, 0);
564
565 /* list all at once */
566 std::vector<RCf::list_entry> result;
567 bool more = false;
568 r = f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield);
569 ASSERT_EQ(0, r);
570 ASSERT_EQ(false, more);
571 ASSERT_EQ(max_entries, result.size());
572
573 std::optional<std::string> marker;
574 /* trim one at a time */
575 result.clear();
576 more = false;
577 marker.reset();
578 for (auto i = 0u; i < max_entries; ++i) {
579 /* read single entry */
580 r = f->list(&dp, 1, marker, &result, &more, null_yield);
581 ASSERT_EQ(0, r);
582 ASSERT_EQ(result.size(), 1);
583 const bool expected_more = (i != (max_entries - 1));
584 ASSERT_EQ(expected_more, more);
585
586 marker = result.front().marker;
587 std::unique_ptr<R::AioCompletion> c(rados.aio_create_completion(nullptr,
588 nullptr));
589 f->trim(&dp, *marker, false, c.get());
590 c->wait_for_complete();
591 r = c->get_return_value();
592 ASSERT_EQ(0, r);
593
594 /* check tail */
595 info = f->meta();
596 ASSERT_EQ(info.tail_part_num, i / entries_per_part);
597
598 /* try to read all again, see how many entries left */
599 r = f->list(&dp, max_entries, marker, &result, &more, null_yield);
600 ASSERT_EQ(max_entries - i - 1, result.size());
601 ASSERT_EQ(false, more);
602 }
603
604 /* tail now should point at head */
605 info = f->meta();
606 ASSERT_EQ(info.head_part_num, info.tail_part_num);
607
608 RCf::part_info partinfo;
609 /* check old tails are removed */
610 for (auto i = 0; i < info.tail_part_num; ++i) {
611 r = f->get_part_info(&dp, i, &partinfo, null_yield);
612 ASSERT_EQ(-ENOENT, r);
613 }
614 /* check current tail exists */
615 r = f->get_part_info(&dp, info.tail_part_num, &partinfo, null_yield);
616 ASSERT_EQ(0, r);
617 }
618
619 TEST_F(LegacyFIFO, TestTrimExclusive) {
620 std::unique_ptr<RCf::FIFO> f;
621 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield);
622 ASSERT_EQ(0, r);
623 std::vector<RCf::list_entry> result;
624 bool more = false;
625
626 static constexpr auto max_entries = 10u;
627 for (uint32_t i = 0; i < max_entries; ++i) {
628 cb::list bl;
629 encode(i, bl);
630 f->push(&dp, bl, null_yield);
631 }
632
633 f->list(&dp, 1, std::nullopt, &result, &more, null_yield);
634 auto [val, marker] = decode_entry<std::uint32_t>(result.front());
635 ASSERT_EQ(0, val);
636 f->trim(&dp, marker, true, null_yield);
637
638 result.clear();
639 f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield);
640 std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
641 ASSERT_EQ(0, val);
642 f->trim(&dp, result[4].marker, true, null_yield);
643
644 result.clear();
645 f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield);
646 std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
647 ASSERT_EQ(4, val);
648 f->trim(&dp, result.back().marker, true, null_yield);
649
650 result.clear();
651 f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield);
652 std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
653 ASSERT_EQ(result.size(), 1);
654 ASSERT_EQ(max_entries - 1, val);
655 }
656
657 TEST_F(AioLegacyFIFO, TestPushListTrim)
658 {
659 std::unique_ptr<RCf::FIFO> f;
660 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield);
661 ASSERT_EQ(0, r);
662 static constexpr auto max_entries = 10u;
663 for (uint32_t i = 0; i < max_entries; ++i) {
664 cb::list bl;
665 encode(i, bl);
666 auto c = R::Rados::aio_create_completion();
667 f->push(&dp, bl, c);
668 c->wait_for_complete();
669 r = c->get_return_value();
670 c->release();
671 ASSERT_EQ(0, r);
672 }
673
674 std::optional<std::string> marker;
675 /* get entries one by one */
676 std::vector<RCf::list_entry> result;
677 bool more = false;
678 for (auto i = 0u; i < max_entries; ++i) {
679 auto c = R::Rados::aio_create_completion();
680 f->list(&dp, 1, marker, &result, &more, c);
681 c->wait_for_complete();
682 r = c->get_return_value();
683 c->release();
684 ASSERT_EQ(0, r);
685
686 bool expected_more = (i != (max_entries - 1));
687 ASSERT_EQ(expected_more, more);
688 ASSERT_EQ(1, result.size());
689
690 std::uint32_t val;
691 std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
692
693 ASSERT_EQ(i, val);
694 result.clear();
695 }
696
697 /* get all entries at once */
698 std::string markers[max_entries];
699 std::uint32_t min_entry = 0;
700 auto c = R::Rados::aio_create_completion();
701 f->list(&dp, max_entries * 10, std::nullopt, &result, &more, c);
702 c->wait_for_complete();
703 r = c->get_return_value();
704 c->release();
705 ASSERT_EQ(0, r);
706
707 ASSERT_FALSE(more);
708 ASSERT_EQ(max_entries, result.size());
709 for (auto i = 0u; i < max_entries; ++i) {
710 std::uint32_t val;
711 std::tie(val, markers[i]) = decode_entry<std::uint32_t>(result[i]);
712 ASSERT_EQ(i, val);
713 }
714
715 /* trim one entry */
716 c = R::Rados::aio_create_completion();
717 f->trim(&dp, markers[min_entry], false, c);
718 c->wait_for_complete();
719 r = c->get_return_value();
720 c->release();
721 ASSERT_EQ(0, r);
722 ++min_entry;
723
724 c = R::Rados::aio_create_completion();
725 f->list(&dp, max_entries * 10, std::nullopt, &result, &more, c);
726 c->wait_for_complete();
727 r = c->get_return_value();
728 c->release();
729 ASSERT_EQ(0, r);
730 ASSERT_FALSE(more);
731 ASSERT_EQ(max_entries - min_entry, result.size());
732
733 for (auto i = min_entry; i < max_entries; ++i) {
734 std::uint32_t val;
735 std::tie(val, markers[i - min_entry]) =
736 decode_entry<std::uint32_t>(result[i - min_entry]);
737 EXPECT_EQ(i, val);
738 }
739 }
740
741
742 TEST_F(AioLegacyFIFO, TestPushTooBig)
743 {
744 static constexpr auto max_part_size = 2048ull;
745 static constexpr auto max_entry_size = 128ull;
746
747 std::unique_ptr<RCf::FIFO> f;
748 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt,
749 std::nullopt, false, max_part_size, max_entry_size);
750 ASSERT_EQ(0, r);
751
752 char buf[max_entry_size + 1];
753 memset(buf, 0, sizeof(buf));
754
755 cb::list bl;
756 bl.append(buf, sizeof(buf));
757
758 auto c = R::Rados::aio_create_completion();
759 f->push(&dp, bl, c);
760 c->wait_for_complete();
761 r = c->get_return_value();
762 ASSERT_EQ(-E2BIG, r);
763 c->release();
764
765 c = R::Rados::aio_create_completion();
766 f->push(&dp, std::vector<cb::list>{}, c);
767 c->wait_for_complete();
768 r = c->get_return_value();
769 c->release();
770 EXPECT_EQ(0, r);
771 }
772
773
774 TEST_F(AioLegacyFIFO, TestMultipleParts)
775 {
776 static constexpr auto max_part_size = 2048ull;
777 static constexpr auto max_entry_size = 128ull;
778 std::unique_ptr<RCf::FIFO> f;
779 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt,
780 std::nullopt, false, max_part_size,
781 max_entry_size);
782 ASSERT_EQ(0, r);
783
784 {
785 auto c = R::Rados::aio_create_completion();
786 f->get_head_info(&dp, [&](int r, RCf::part_info&& p) {
787 ASSERT_TRUE(p.tag.empty());
788 ASSERT_EQ(0, p.magic);
789 ASSERT_EQ(0, p.min_ofs);
790 ASSERT_EQ(0, p.last_ofs);
791 ASSERT_EQ(0, p.next_ofs);
792 ASSERT_EQ(0, p.min_index);
793 ASSERT_EQ(0, p.max_index);
794 ASSERT_EQ(ceph::real_time{}, p.max_time);
795 }, c);
796 c->wait_for_complete();
797 r = c->get_return_value();
798 c->release();
799 }
800
801 char buf[max_entry_size];
802 memset(buf, 0, sizeof(buf));
803 const auto [part_header_size, part_entry_overhead] =
804 f->get_part_layout_info();
805 const auto entries_per_part = ((max_part_size - part_header_size) /
806 (max_entry_size + part_entry_overhead));
807 const auto max_entries = entries_per_part * 4 + 1;
808 /* push enough entries */
809 for (auto i = 0u; i < max_entries; ++i) {
810 cb::list bl;
811 *(int *)buf = i;
812 bl.append(buf, sizeof(buf));
813 auto c = R::Rados::aio_create_completion();
814 f->push(&dp, bl, c);
815 c->wait_for_complete();
816 r = c->get_return_value();
817 c->release();
818 EXPECT_EQ(0, r);
819 }
820
821 auto info = f->meta();
822 ASSERT_EQ(info.id, fifo_id);
823 /* head should have advanced */
824 ASSERT_GT(info.head_part_num, 0);
825
826 /* list all at once */
827 std::vector<RCf::list_entry> result;
828 bool more = false;
829 auto c = R::Rados::aio_create_completion();
830 f->list(&dp, max_entries, std::nullopt, &result, &more, c);
831 c->wait_for_complete();
832 r = c->get_return_value();
833 c->release();
834 EXPECT_EQ(0, r);
835 EXPECT_EQ(false, more);
836 ASSERT_EQ(max_entries, result.size());
837
838 for (auto i = 0u; i < max_entries; ++i) {
839 auto& bl = result[i].data;
840 ASSERT_EQ(i, *(int *)bl.c_str());
841 }
842
843 std::optional<std::string> marker;
844 /* get entries one by one */
845
846 for (auto i = 0u; i < max_entries; ++i) {
847 c = R::Rados::aio_create_completion();
848 f->list(&dp, 1, marker, &result, &more, c);
849 c->wait_for_complete();
850 r = c->get_return_value();
851 c->release();
852 EXPECT_EQ(0, r);
853 ASSERT_EQ(result.size(), 1);
854 const bool expected_more = (i != (max_entries - 1));
855 ASSERT_EQ(expected_more, more);
856
857 std::uint32_t val;
858 std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
859
860 auto& entry = result.front();
861 auto& bl = entry.data;
862 ASSERT_EQ(i, *(int *)bl.c_str());
863 marker = entry.marker;
864 }
865
866 /* trim one at a time */
867 marker.reset();
868 for (auto i = 0u; i < max_entries; ++i) {
869 /* read single entry */
870 c = R::Rados::aio_create_completion();
871 f->list(&dp, 1, marker, &result, &more, c);
872 c->wait_for_complete();
873 r = c->get_return_value();
874 c->release();
875 EXPECT_EQ(0, r);
876 ASSERT_EQ(result.size(), 1);
877 const bool expected_more = (i != (max_entries - 1));
878 ASSERT_EQ(expected_more, more);
879
880 marker = result.front().marker;
881 c = R::Rados::aio_create_completion();
882 f->trim(&dp, *marker, false, c);
883 c->wait_for_complete();
884 r = c->get_return_value();
885 c->release();
886 EXPECT_EQ(0, r);
887 ASSERT_EQ(result.size(), 1);
888
889 /* check tail */
890 info = f->meta();
891 ASSERT_EQ(info.tail_part_num, i / entries_per_part);
892
893 /* try to read all again, see how many entries left */
894 c = R::Rados::aio_create_completion();
895 f->list(&dp, max_entries, marker, &result, &more, c);
896 c->wait_for_complete();
897 r = c->get_return_value();
898 c->release();
899 EXPECT_EQ(0, r);
900 ASSERT_EQ(max_entries - i - 1, result.size());
901 ASSERT_EQ(false, more);
902 }
903
904 /* tail now should point at head */
905 info = f->meta();
906 ASSERT_EQ(info.head_part_num, info.tail_part_num);
907
908 /* check old tails are removed */
909 for (auto i = 0; i < info.tail_part_num; ++i) {
910 c = R::Rados::aio_create_completion();
911 RCf::part_info partinfo;
912 f->get_part_info(i, &partinfo, c);
913 c->wait_for_complete();
914 r = c->get_return_value();
915 c->release();
916 ASSERT_EQ(-ENOENT, r);
917 }
918 /* check current tail exists */
919 std::uint64_t next_ofs;
920 {
921 c = R::Rados::aio_create_completion();
922 RCf::part_info partinfo;
923 f->get_part_info(info.tail_part_num, &partinfo, c);
924 c->wait_for_complete();
925 r = c->get_return_value();
926 c->release();
927 next_ofs = partinfo.next_ofs;
928 }
929 ASSERT_EQ(0, r);
930
931 c = R::Rados::aio_create_completion();
932 f->get_head_info(&dp, [&](int r, RCf::part_info&& p) {
933 ASSERT_EQ(next_ofs, p.next_ofs);
934 }, c);
935 c->wait_for_complete();
936 r = c->get_return_value();
937 c->release();
938 ASSERT_EQ(0, r);
939 }
940
941 TEST_F(AioLegacyFIFO, TestTwoPushers)
942 {
943 static constexpr auto max_part_size = 2048ull;
944 static constexpr auto max_entry_size = 128ull;
945
946 std::unique_ptr<RCf::FIFO> f;
947 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt,
948 std::nullopt, false, max_part_size,
949 max_entry_size);
950 ASSERT_EQ(0, r);
951 char buf[max_entry_size];
952 memset(buf, 0, sizeof(buf));
953
954 auto [part_header_size, part_entry_overhead] = f->get_part_layout_info();
955 const auto entries_per_part = ((max_part_size - part_header_size) /
956 (max_entry_size + part_entry_overhead));
957 const auto max_entries = entries_per_part * 4 + 1;
958 std::unique_ptr<RCf::FIFO> f2;
959 r = RCf::FIFO::open(&dp, ioctx, fifo_id, &f2, null_yield);
960 std::vector fifos{&f, &f2};
961
962 for (auto i = 0u; i < max_entries; ++i) {
963 cb::list bl;
964 *(int *)buf = i;
965 bl.append(buf, sizeof(buf));
966 auto& f = *fifos[i % fifos.size()];
967 auto c = R::Rados::aio_create_completion();
968 f->push(&dp, bl, c);
969 c->wait_for_complete();
970 r = c->get_return_value();
971 c->release();
972 ASSERT_EQ(0, r);
973 }
974
975 /* list all by both */
976 std::vector<RCf::list_entry> result;
977 bool more = false;
978 auto c = R::Rados::aio_create_completion();
979 f2->list(&dp, max_entries, std::nullopt, &result, &more, c);
980 c->wait_for_complete();
981 r = c->get_return_value();
982 c->release();
983 ASSERT_EQ(0, r);
984 ASSERT_EQ(false, more);
985 ASSERT_EQ(max_entries, result.size());
986
987 c = R::Rados::aio_create_completion();
988 f2->list(&dp, max_entries, std::nullopt, &result, &more, c);
989 c->wait_for_complete();
990 r = c->get_return_value();
991 c->release();
992 ASSERT_EQ(0, r);
993 ASSERT_EQ(false, more);
994 ASSERT_EQ(max_entries, result.size());
995
996 for (auto i = 0u; i < max_entries; ++i) {
997 auto& bl = result[i].data;
998 ASSERT_EQ(i, *(int *)bl.c_str());
999 }
1000 }
1001
1002 TEST_F(AioLegacyFIFO, TestTwoPushersTrim)
1003 {
1004 static constexpr auto max_part_size = 2048ull;
1005 static constexpr auto max_entry_size = 128ull;
1006 std::unique_ptr<RCf::FIFO> f1;
1007 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f1, null_yield, std::nullopt,
1008 std::nullopt, false, max_part_size,
1009 max_entry_size);
1010 ASSERT_EQ(0, r);
1011
1012 char buf[max_entry_size];
1013 memset(buf, 0, sizeof(buf));
1014
1015 auto [part_header_size, part_entry_overhead] = f1->get_part_layout_info();
1016 const auto entries_per_part = ((max_part_size - part_header_size) /
1017 (max_entry_size + part_entry_overhead));
1018 const auto max_entries = entries_per_part * 4 + 1;
1019
1020 std::unique_ptr<RCf::FIFO> f2;
1021 r = RCf::FIFO::open(&dp, ioctx, fifo_id, &f2, null_yield);
1022 ASSERT_EQ(0, r);
1023
1024 /* push one entry to f2 and the rest to f1 */
1025 for (auto i = 0u; i < max_entries; ++i) {
1026 cb::list bl;
1027 *(int *)buf = i;
1028 bl.append(buf, sizeof(buf));
1029 auto& f = (i < 1 ? f2 : f1);
1030 auto c = R::Rados::aio_create_completion();
1031 f->push(&dp, bl, c);
1032 c->wait_for_complete();
1033 r = c->get_return_value();
1034 c->release();
1035 ASSERT_EQ(0, r);
1036 }
1037
1038 /* trim half by fifo1 */
1039 auto num = max_entries / 2;
1040 std::string marker;
1041 std::vector<RCf::list_entry> result;
1042 bool more = false;
1043 auto c = R::Rados::aio_create_completion();
1044 f1->list(&dp, num, std::nullopt, &result, &more, c);
1045 c->wait_for_complete();
1046 r = c->get_return_value();
1047 c->release();
1048 ASSERT_EQ(0, r);
1049 ASSERT_EQ(true, more);
1050 ASSERT_EQ(num, result.size());
1051
1052 for (auto i = 0u; i < num; ++i) {
1053 auto& bl = result[i].data;
1054 ASSERT_EQ(i, *(int *)bl.c_str());
1055 }
1056
1057 auto& entry = result[num - 1];
1058 marker = entry.marker;
1059 c = R::Rados::aio_create_completion();
1060 f1->trim(&dp, marker, false, c);
1061 c->wait_for_complete();
1062 r = c->get_return_value();
1063 c->release();
1064 ASSERT_EQ(0, r);
1065 /* list what's left by fifo2 */
1066
1067 const auto left = max_entries - num;
1068 c = R::Rados::aio_create_completion();
1069 f2->list(&dp, left, marker, &result, &more, c);
1070 c->wait_for_complete();
1071 r = c->get_return_value();
1072 c->release();
1073 ASSERT_EQ(0, r);
1074 ASSERT_EQ(left, result.size());
1075 ASSERT_EQ(false, more);
1076
1077 for (auto i = num; i < max_entries; ++i) {
1078 auto& bl = result[i - num].data;
1079 ASSERT_EQ(i, *(int *)bl.c_str());
1080 }
1081 }
1082
1083 TEST_F(AioLegacyFIFO, TestPushBatch)
1084 {
1085 static constexpr auto max_part_size = 2048ull;
1086 static constexpr auto max_entry_size = 128ull;
1087
1088 std::unique_ptr<RCf::FIFO> f;
1089 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt,
1090 std::nullopt, false, max_part_size,
1091 max_entry_size);
1092 ASSERT_EQ(0, r);
1093
1094 char buf[max_entry_size];
1095 memset(buf, 0, sizeof(buf));
1096 auto [part_header_size, part_entry_overhead] = f->get_part_layout_info();
1097 auto entries_per_part = ((max_part_size - part_header_size) /
1098 (max_entry_size + part_entry_overhead));
1099 auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */
1100 std::vector<cb::list> bufs;
1101 for (auto i = 0u; i < max_entries; ++i) {
1102 cb::list bl;
1103 *(int *)buf = i;
1104 bl.append(buf, sizeof(buf));
1105 bufs.push_back(bl);
1106 }
1107 ASSERT_EQ(max_entries, bufs.size());
1108
1109 auto c = R::Rados::aio_create_completion();
1110 f->push(&dp, bufs, c);
1111 c->wait_for_complete();
1112 r = c->get_return_value();
1113 c->release();
1114 ASSERT_EQ(0, r);
1115
1116 /* list all */
1117
1118 std::vector<RCf::list_entry> result;
1119 bool more = false;
1120 c = R::Rados::aio_create_completion();
1121 f->list(&dp, max_entries, std::nullopt, &result, &more, c);
1122 c->wait_for_complete();
1123 r = c->get_return_value();
1124 c->release();
1125 ASSERT_EQ(0, r);
1126 ASSERT_EQ(false, more);
1127 ASSERT_EQ(max_entries, result.size());
1128 for (auto i = 0u; i < max_entries; ++i) {
1129 auto& bl = result[i].data;
1130 ASSERT_EQ(i, *(int *)bl.c_str());
1131 }
1132 auto& info = f->meta();
1133 ASSERT_EQ(info.head_part_num, 4);
1134 }
1135
1136 TEST_F(LegacyFIFO, TrimAll)
1137 {
1138 std::unique_ptr<RCf::FIFO> f;
1139 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield);
1140 ASSERT_EQ(0, r);
1141 static constexpr auto max_entries = 10u;
1142 for (uint32_t i = 0; i < max_entries; ++i) {
1143 cb::list bl;
1144 encode(i, bl);
1145 r = f->push(&dp, bl, null_yield);
1146 ASSERT_EQ(0, r);
1147 }
1148
1149 /* trim one entry */
1150 r = f->trim(&dp, RCf::marker::max().to_string(), false, null_yield);
1151 ASSERT_EQ(-ENODATA, r);
1152
1153 std::vector<RCf::list_entry> result;
1154 bool more;
1155 r = f->list(&dp, 1, std::nullopt, &result, &more, null_yield);
1156 ASSERT_EQ(0, r);
1157 ASSERT_TRUE(result.empty());
1158 }
1159
1160 TEST_F(LegacyFIFO, AioTrimAll)
1161 {
1162 std::unique_ptr<RCf::FIFO> f;
1163 auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield);
1164 ASSERT_EQ(0, r);
1165 static constexpr auto max_entries = 10u;
1166 for (uint32_t i = 0; i < max_entries; ++i) {
1167 cb::list bl;
1168 encode(i, bl);
1169 r = f->push(&dp, bl, null_yield);
1170 ASSERT_EQ(0, r);
1171 }
1172
1173 auto c = R::Rados::aio_create_completion();
1174 f->trim(&dp, RCf::marker::max().to_string(), false, c);
1175 c->wait_for_complete();
1176 r = c->get_return_value();
1177 c->release();
1178 ASSERT_EQ(-ENODATA, r);
1179
1180 std::vector<RCf::list_entry> result;
1181 bool more;
1182 r = f->list(&dp, 1, std::nullopt, &result, &more, null_yield);
1183 ASSERT_EQ(0, r);
1184 ASSERT_TRUE(result.empty());
1185 }