]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/driver/rados/cls_fifo_legacy.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / driver / rados / cls_fifo_legacy.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2020 Red Hat <contact@redhat.com>
7 * Author: Adam C. Emerson
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include <algorithm>
17 #include <cstdint>
18 #include <numeric>
19 #include <optional>
20 #include <string_view>
21
22 #include <fmt/format.h>
23
24 #include "include/rados/librados.hpp"
25
26 #include "include/buffer.h"
27
28 #include "common/async/yield_context.h"
29 #include "common/random_string.h"
30
31 #include "cls/fifo/cls_fifo_types.h"
32 #include "cls/fifo/cls_fifo_ops.h"
33
34 #include "cls_fifo_legacy.h"
35
36 namespace rgw::cls::fifo {
37 namespace cb = ceph::buffer;
38 namespace fifo = rados::cls::fifo;
39
40 using ceph::from_error_code;
41
42 inline constexpr auto MAX_RACE_RETRIES = 10;
43
44 void create_meta(lr::ObjectWriteOperation* op,
45 std::string_view id,
46 std::optional<fifo::objv> objv,
47 std::optional<std::string_view> oid_prefix,
48 bool exclusive,
49 std::uint64_t max_part_size,
50 std::uint64_t max_entry_size)
51 {
52 fifo::op::create_meta cm;
53
54 cm.id = id;
55 cm.version = objv;
56 cm.oid_prefix = oid_prefix;
57 cm.max_part_size = max_part_size;
58 cm.max_entry_size = max_entry_size;
59 cm.exclusive = exclusive;
60
61 cb::list in;
62 encode(cm, in);
63 op->exec(fifo::op::CLASS, fifo::op::CREATE_META, in);
64 }
65
66 int get_meta(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
67 std::optional<fifo::objv> objv, fifo::info* info,
68 std::uint32_t* part_header_size,
69 std::uint32_t* part_entry_overhead,
70 uint64_t tid, optional_yield y,
71 bool probe)
72 {
73 lr::ObjectReadOperation op;
74 fifo::op::get_meta gm;
75 gm.version = objv;
76 cb::list in;
77 encode(gm, in);
78 cb::list bl;
79
80 op.exec(fifo::op::CLASS, fifo::op::GET_META, in,
81 &bl, nullptr);
82 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
83 if (r >= 0) try {
84 fifo::op::get_meta_reply reply;
85 auto iter = bl.cbegin();
86 decode(reply, iter);
87 if (info) *info = std::move(reply.info);
88 if (part_header_size) *part_header_size = reply.part_header_size;
89 if (part_entry_overhead)
90 *part_entry_overhead = reply.part_entry_overhead;
91 } catch (const cb::error& err) {
92 ldpp_dout(dpp, -1)
93 << __PRETTY_FUNCTION__ << ":" << __LINE__
94 << " decode failed: " << err.what()
95 << " tid=" << tid << dendl;
96 r = from_error_code(err.code());
97 } else if (!(probe && (r == -ENOENT || r == -ENODATA))) {
98 ldpp_dout(dpp, -1)
99 << __PRETTY_FUNCTION__ << ":" << __LINE__
100 << " fifo::op::GET_META failed r=" << r << " tid=" << tid
101 << dendl;
102 }
103 return r;
104 };
105
106 namespace {
107 void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv,
108 const fifo::update& update)
109 {
110 fifo::op::update_meta um;
111
112 um.version = objv;
113 um.tail_part_num = update.tail_part_num();
114 um.head_part_num = update.head_part_num();
115 um.min_push_part_num = update.min_push_part_num();
116 um.max_push_part_num = update.max_push_part_num();
117 um.journal_entries_add = std::move(update).journal_entries_add();
118 um.journal_entries_rm = std::move(update).journal_entries_rm();
119
120 cb::list in;
121 encode(um, in);
122 op->exec(fifo::op::CLASS, fifo::op::UPDATE_META, in);
123 }
124
125 void part_init(lr::ObjectWriteOperation* op, fifo::data_params params)
126 {
127 fifo::op::init_part ip;
128
129 ip.params = params;
130
131 cb::list in;
132 encode(ip, in);
133 op->exec(fifo::op::CLASS, fifo::op::INIT_PART, in);
134 }
135
136 int push_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
137 std::deque<cb::list> data_bufs, std::uint64_t tid,
138 optional_yield y)
139 {
140 lr::ObjectWriteOperation op;
141 fifo::op::push_part pp;
142
143 op.assert_exists();
144
145 pp.data_bufs = data_bufs;
146 pp.total_len = 0;
147
148 for (const auto& bl : data_bufs)
149 pp.total_len += bl.length();
150
151 cb::list in;
152 encode(pp, in);
153 auto retval = 0;
154 op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in, nullptr, &retval);
155 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y, lr::OPERATION_RETURNVEC);
156 if (r < 0) {
157 ldpp_dout(dpp, -1)
158 << __PRETTY_FUNCTION__ << ":" << __LINE__
159 << " fifo::op::PUSH_PART failed r=" << r
160 << " tid=" << tid << dendl;
161 return r;
162 }
163 if (retval < 0) {
164 ldpp_dout(dpp, -1)
165 << __PRETTY_FUNCTION__ << ":" << __LINE__
166 << " error handling response retval=" << retval
167 << " tid=" << tid << dendl;
168 }
169 return retval;
170 }
171
172 void push_part(lr::IoCtx& ioctx, const std::string& oid,
173 std::deque<cb::list> data_bufs, std::uint64_t tid,
174 lr::AioCompletion* c)
175 {
176 lr::ObjectWriteOperation op;
177 fifo::op::push_part pp;
178
179 pp.data_bufs = data_bufs;
180 pp.total_len = 0;
181
182 for (const auto& bl : data_bufs)
183 pp.total_len += bl.length();
184
185 cb::list in;
186 encode(pp, in);
187 op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in);
188 auto r = ioctx.aio_operate(oid, c, &op, lr::OPERATION_RETURNVEC);
189 ceph_assert(r >= 0);
190 }
191
192 void trim_part(lr::ObjectWriteOperation* op,
193 std::uint64_t ofs, bool exclusive)
194 {
195 fifo::op::trim_part tp;
196
197 tp.ofs = ofs;
198 tp.exclusive = exclusive;
199
200 cb::list in;
201 encode(tp, in);
202 op->exec(fifo::op::CLASS, fifo::op::TRIM_PART, in);
203 }
204
205 int list_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
206 std::uint64_t ofs, std::uint64_t max_entries,
207 std::vector<fifo::part_list_entry>* entries,
208 bool* more, bool* full_part,
209 std::uint64_t tid, optional_yield y)
210 {
211 lr::ObjectReadOperation op;
212 fifo::op::list_part lp;
213
214 lp.ofs = ofs;
215 lp.max_entries = max_entries;
216
217 cb::list in;
218 encode(lp, in);
219 cb::list bl;
220 op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, &bl, nullptr);
221 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
222 if (r >= 0) try {
223 fifo::op::list_part_reply reply;
224 auto iter = bl.cbegin();
225 decode(reply, iter);
226 if (entries) *entries = std::move(reply.entries);
227 if (more) *more = reply.more;
228 if (full_part) *full_part = reply.full_part;
229 } catch (const cb::error& err) {
230 ldpp_dout(dpp, -1)
231 << __PRETTY_FUNCTION__ << ":" << __LINE__
232 << " decode failed: " << err.what()
233 << " tid=" << tid << dendl;
234 r = from_error_code(err.code());
235 } else if (r != -ENOENT) {
236 ldpp_dout(dpp, -1)
237 << __PRETTY_FUNCTION__ << ":" << __LINE__
238 << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid
239 << dendl;
240 }
241 return r;
242 }
243
244 struct list_entry_completion : public lr::ObjectOperationCompletion {
245 CephContext* cct;
246 int* r_out;
247 std::vector<fifo::part_list_entry>* entries;
248 bool* more;
249 bool* full_part;
250 std::uint64_t tid;
251
252 list_entry_completion(CephContext* cct, int* r_out, std::vector<fifo::part_list_entry>* entries,
253 bool* more, bool* full_part, std::uint64_t tid)
254 : cct(cct), r_out(r_out), entries(entries), more(more),
255 full_part(full_part), tid(tid) {}
256 virtual ~list_entry_completion() = default;
257 void handle_completion(int r, bufferlist& bl) override {
258 if (r >= 0) try {
259 fifo::op::list_part_reply reply;
260 auto iter = bl.cbegin();
261 decode(reply, iter);
262 if (entries) *entries = std::move(reply.entries);
263 if (more) *more = reply.more;
264 if (full_part) *full_part = reply.full_part;
265 } catch (const cb::error& err) {
266 lderr(cct)
267 << __PRETTY_FUNCTION__ << ":" << __LINE__
268 << " decode failed: " << err.what()
269 << " tid=" << tid << dendl;
270 r = from_error_code(err.code());
271 } else if (r < 0) {
272 lderr(cct)
273 << __PRETTY_FUNCTION__ << ":" << __LINE__
274 << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid
275 << dendl;
276 }
277 if (r_out) *r_out = r;
278 }
279 };
280
281 lr::ObjectReadOperation list_part(CephContext* cct,
282 std::uint64_t ofs,
283 std::uint64_t max_entries,
284 int* r_out,
285 std::vector<fifo::part_list_entry>* entries,
286 bool* more, bool* full_part,
287 std::uint64_t tid)
288 {
289 lr::ObjectReadOperation op;
290 fifo::op::list_part lp;
291
292 lp.ofs = ofs;
293 lp.max_entries = max_entries;
294
295 cb::list in;
296 encode(lp, in);
297 op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in,
298 new list_entry_completion(cct, r_out, entries, more, full_part,
299 tid));
300 return op;
301 }
302
303 int get_part_info(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
304 fifo::part_header* header,
305 std::uint64_t tid, optional_yield y)
306 {
307 lr::ObjectReadOperation op;
308 fifo::op::get_part_info gpi;
309
310 cb::list in;
311 cb::list bl;
312 encode(gpi, in);
313 op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, &bl, nullptr);
314 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
315 if (r >= 0) try {
316 fifo::op::get_part_info_reply reply;
317 auto iter = bl.cbegin();
318 decode(reply, iter);
319 if (header) *header = std::move(reply.header);
320 } catch (const cb::error& err) {
321 ldpp_dout(dpp, -1)
322 << __PRETTY_FUNCTION__ << ":" << __LINE__
323 << " decode failed: " << err.what()
324 << " tid=" << tid << dendl;
325 r = from_error_code(err.code());
326 } else {
327 ldpp_dout(dpp, -1)
328 << __PRETTY_FUNCTION__ << ":" << __LINE__
329 << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid
330 << dendl;
331 }
332 return r;
333 }
334
335 struct partinfo_completion : public lr::ObjectOperationCompletion {
336 CephContext* cct;
337 int* rp;
338 fifo::part_header* h;
339 std::uint64_t tid;
340 partinfo_completion(CephContext* cct, int* rp, fifo::part_header* h,
341 std::uint64_t tid) :
342 cct(cct), rp(rp), h(h), tid(tid) {
343 }
344 virtual ~partinfo_completion() = default;
345 void handle_completion(int r, bufferlist& bl) override {
346 if (r >= 0) try {
347 fifo::op::get_part_info_reply reply;
348 auto iter = bl.cbegin();
349 decode(reply, iter);
350 if (h) *h = std::move(reply.header);
351 } catch (const cb::error& err) {
352 r = from_error_code(err.code());
353 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
354 << " decode failed: " << err.what()
355 << " tid=" << tid << dendl;
356 } else {
357 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
358 << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid
359 << dendl;
360 }
361 if (rp) {
362 *rp = r;
363 }
364 }
365 };
366
367 lr::ObjectReadOperation get_part_info(CephContext* cct,
368 fifo::part_header* header,
369 std::uint64_t tid, int* r = 0)
370 {
371 lr::ObjectReadOperation op;
372 fifo::op::get_part_info gpi;
373
374 cb::list in;
375 cb::list bl;
376 encode(gpi, in);
377 op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in,
378 new partinfo_completion(cct, r, header, tid));
379 return op;
380 }
381 }
382
383 std::optional<marker> FIFO::to_marker(std::string_view s)
384 {
385 marker m;
386 if (s.empty()) {
387 m.num = info.tail_part_num;
388 m.ofs = 0;
389 return m;
390 }
391
392 auto pos = s.find(':');
393 if (pos == s.npos) {
394 return std::nullopt;
395 }
396
397 auto num = s.substr(0, pos);
398 auto ofs = s.substr(pos + 1);
399
400 auto n = ceph::parse<decltype(m.num)>(num);
401 if (!n) {
402 return std::nullopt;
403 }
404 m.num = *n;
405 auto o = ceph::parse<decltype(m.ofs)>(ofs);
406 if (!o) {
407 return std::nullopt;
408 }
409 m.ofs = *o;
410 return m;
411 }
412
413 int FIFO::apply_update(const DoutPrefixProvider *dpp,
414 fifo::info* info,
415 const fifo::objv& objv,
416 const fifo::update& update,
417 std::uint64_t tid)
418 {
419 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
420 << " entering: tid=" << tid << dendl;
421 std::unique_lock l(m);
422 if (objv != info->version) {
423 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
424 << " version mismatch, canceling: tid=" << tid << dendl;
425 return -ECANCELED;
426 }
427
428 info->apply_update(update);
429 return {};
430 }
431
432 int FIFO::_update_meta(const DoutPrefixProvider *dpp, const fifo::update& update,
433 fifo::objv version, bool* pcanceled,
434 std::uint64_t tid, optional_yield y)
435 {
436 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
437 << " entering: tid=" << tid << dendl;
438 lr::ObjectWriteOperation op;
439 bool canceled = false;
440 update_meta(&op, version, update);
441 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
442 if (r >= 0 || r == -ECANCELED) {
443 canceled = (r == -ECANCELED);
444 if (!canceled) {
445 r = apply_update(dpp, &info, version, update, tid);
446 if (r < 0) canceled = true;
447 }
448 if (canceled) {
449 r = read_meta(dpp, tid, y);
450 canceled = r < 0 ? false : true;
451 }
452 }
453 if (pcanceled) *pcanceled = canceled;
454 if (canceled) {
455 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
456 << " canceled: tid=" << tid << dendl;
457 }
458 if (r < 0) {
459 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
460 << " returning error: r=" << r << " tid=" << tid << dendl;
461 }
462 return r;
463 }
464
465 struct Updater : public Completion<Updater> {
466 FIFO* fifo;
467 fifo::update update;
468 fifo::objv version;
469 bool reread = false;
470 bool* pcanceled = nullptr;
471 std::uint64_t tid;
472 Updater(const DoutPrefixProvider *dpp, FIFO* fifo, lr::AioCompletion* super,
473 const fifo::update& update, fifo::objv version,
474 bool* pcanceled, std::uint64_t tid)
475 : Completion(dpp, super), fifo(fifo), update(update), version(version),
476 pcanceled(pcanceled) {}
477
478 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
479 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
480 << " entering: tid=" << tid << dendl;
481 if (reread)
482 handle_reread(dpp, std::move(p), r);
483 else
484 handle_update(dpp, std::move(p), r);
485 }
486
487 void handle_update(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
488 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
489 << " handling async update_meta: tid="
490 << tid << dendl;
491 if (r < 0 && r != -ECANCELED) {
492 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
493 << " update failed: r=" << r << " tid=" << tid << dendl;
494 complete(std::move(p), r);
495 return;
496 }
497 bool canceled = (r == -ECANCELED);
498 if (!canceled) {
499 int r = fifo->apply_update(dpp, &fifo->info, version, update, tid);
500 if (r < 0) {
501 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
502 << " update failed, marking canceled: r=" << r
503 << " tid=" << tid << dendl;
504 canceled = true;
505 }
506 }
507 if (canceled) {
508 reread = true;
509 fifo->read_meta(dpp, tid, call(std::move(p)));
510 return;
511 }
512 if (pcanceled)
513 *pcanceled = false;
514 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
515 << " completing: tid=" << tid << dendl;
516 complete(std::move(p), 0);
517 }
518
519 void handle_reread(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
520 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
521 << " handling async read_meta: tid="
522 << tid << dendl;
523 if (r < 0 && pcanceled) {
524 *pcanceled = false;
525 } else if (r >= 0 && pcanceled) {
526 *pcanceled = true;
527 }
528 if (r < 0) {
529 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
530 << " failed dispatching read_meta: r=" << r << " tid="
531 << tid << dendl;
532 } else {
533 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
534 << " completing: tid=" << tid << dendl;
535 }
536 complete(std::move(p), r);
537 }
538 };
539
540 void FIFO::_update_meta(const DoutPrefixProvider *dpp, const fifo::update& update,
541 fifo::objv version, bool* pcanceled,
542 std::uint64_t tid, lr::AioCompletion* c)
543 {
544 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
545 << " entering: tid=" << tid << dendl;
546 lr::ObjectWriteOperation op;
547 update_meta(&op, info.version, update);
548 auto updater = std::make_unique<Updater>(dpp, this, c, update, version, pcanceled,
549 tid);
550 auto r = ioctx.aio_operate(oid, Updater::call(std::move(updater)), &op);
551 assert(r >= 0);
552 }
553
554 int FIFO::create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid,
555 optional_yield y)
556 {
557 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
558 << " entering: tid=" << tid << dendl;
559 lr::ObjectWriteOperation op;
560 op.create(false); /* We don't need exclusivity, part_init ensures
561 we're creating from the same journal entry. */
562 std::unique_lock l(m);
563 part_init(&op, info.params);
564 auto oid = info.part_oid(part_num);
565 l.unlock();
566 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
567 if (r < 0) {
568 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
569 << " part_init failed: r=" << r << " tid="
570 << tid << dendl;
571 }
572 return r;
573 }
574
575 int FIFO::remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid,
576 optional_yield y)
577 {
578 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
579 << " entering: tid=" << tid << dendl;
580 lr::ObjectWriteOperation op;
581 op.remove();
582 std::unique_lock l(m);
583 auto oid = info.part_oid(part_num);
584 l.unlock();
585 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
586 if (r < 0) {
587 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
588 << " remove failed: r=" << r << " tid="
589 << tid << dendl;
590 }
591 return r;
592 }
593
594 int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y)
595 {
596 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
597 << " entering: tid=" << tid << dendl;
598 std::vector<fifo::journal_entry> processed;
599
600 std::unique_lock l(m);
601 auto tmpjournal = info.journal;
602 auto new_tail = info.tail_part_num;
603 auto new_head = info.head_part_num;
604 auto new_max = info.max_push_part_num;
605 l.unlock();
606
607 int r = 0;
608 for (auto& entry : tmpjournal) {
609 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
610 << " processing entry: entry=" << entry << " tid=" << tid
611 << dendl;
612 switch (entry.op) {
613 using enum fifo::journal_entry::Op;
614 case create:
615 r = create_part(dpp, entry.part_num, tid, y);
616 if (entry.part_num > new_max) {
617 new_max = entry.part_num;
618 }
619 break;
620 case set_head:
621 r = 0;
622 if (entry.part_num > new_head) {
623 new_head = entry.part_num;
624 }
625 break;
626 case remove:
627 r = remove_part(dpp, entry.part_num, tid, y);
628 if (r == -ENOENT) r = 0;
629 if (entry.part_num >= new_tail) {
630 new_tail = entry.part_num + 1;
631 }
632 break;
633 default:
634 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
635 << " unknown journaled op: entry=" << entry << " tid="
636 << tid << dendl;
637 return -EIO;
638 }
639
640 if (r < 0) {
641 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
642 << " processing entry failed: entry=" << entry
643 << " r=" << r << " tid=" << tid << dendl;
644 return -r;
645 }
646
647 processed.push_back(std::move(entry));
648 }
649
650 // Postprocess
651 bool canceled = true;
652
653 for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
654 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
655 << " postprocessing: i=" << i << " tid=" << tid << dendl;
656
657 std::optional<int64_t> tail_part_num;
658 std::optional<int64_t> head_part_num;
659 std::optional<int64_t> max_part_num;
660
661 std::unique_lock l(m);
662 auto objv = info.version;
663 if (new_tail > tail_part_num) tail_part_num = new_tail;
664 if (new_head > info.head_part_num) head_part_num = new_head;
665 if (new_max > info.max_push_part_num) max_part_num = new_max;
666 l.unlock();
667
668 if (processed.empty() &&
669 !tail_part_num &&
670 !max_part_num) {
671 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
672 << " nothing to update any more: i=" << i << " tid="
673 << tid << dendl;
674 canceled = false;
675 break;
676 }
677 auto u = fifo::update().tail_part_num(tail_part_num)
678 .head_part_num(head_part_num).max_push_part_num(max_part_num)
679 .journal_entries_rm(processed);
680 r = _update_meta(dpp, u, objv, &canceled, tid, y);
681 if (r < 0) {
682 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
683 << " _update_meta failed: update=" << u
684 << " r=" << r << " tid=" << tid << dendl;
685 break;
686 }
687
688 if (canceled) {
689 std::vector<fifo::journal_entry> new_processed;
690 std::unique_lock l(m);
691 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
692 << " update canceled, retrying: i=" << i << " tid="
693 << tid << dendl;
694 for (auto& e : processed) {
695 if (info.journal.contains(e)) {
696 new_processed.push_back(e);
697 }
698 }
699 processed = std::move(new_processed);
700 }
701 }
702 if (r == 0 && canceled) {
703 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
704 << " canceled too many times, giving up: tid=" << tid << dendl;
705 r = -ECANCELED;
706 }
707 if (r < 0) {
708 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
709 << " failed, r=: " << r << " tid=" << tid << dendl;
710 }
711 return r;
712 }
713
714 int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp,
715 std::int64_t new_part_num, bool is_head,
716 std::uint64_t tid, optional_yield y)
717 {
718 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
719 << " entering: tid=" << tid << dendl;
720 std::unique_lock l(m);
721 using enum fifo::journal_entry::Op;
722 std::vector<fifo::journal_entry> jentries{{ create, new_part_num }};
723 if (info.journal.contains({create, new_part_num}) &&
724 (!is_head || info.journal.contains({set_head, new_part_num}))) {
725 l.unlock();
726 ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
727 << " new part journaled, but not processed: tid="
728 << tid << dendl;
729 auto r = process_journal(dpp, tid, y);
730 if (r < 0) {
731 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
732 << " process_journal failed: r=" << r << " tid=" << tid << dendl;
733 }
734 return r;
735 }
736 auto version = info.version;
737
738 if (is_head) {
739 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
740 << " needs new head: tid=" << tid << dendl;
741 jentries.push_back({ set_head, new_part_num });
742 }
743 l.unlock();
744
745 int r = 0;
746 bool canceled = true;
747 for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
748 canceled = false;
749 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
750 << " updating metadata: i=" << i << " tid=" << tid << dendl;
751 auto u = fifo::update{}.journal_entries_add(jentries);
752 r = _update_meta(dpp, u, version, &canceled, tid, y);
753 if (r >= 0 && canceled) {
754 std::unique_lock l(m);
755 version = info.version;
756 auto found = (info.journal.contains({create, new_part_num}) ||
757 info.journal.contains({set_head, new_part_num}));
758 if ((info.max_push_part_num >= new_part_num &&
759 info.head_part_num >= new_part_num)) {
760 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
761 << " raced, but journaled and processed: i=" << i
762 << " tid=" << tid << dendl;
763 return 0;
764 }
765 if (found) {
766 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
767 << " raced, journaled but not processed: i=" << i
768 << " tid=" << tid << dendl;
769 canceled = false;
770 }
771 l.unlock();
772 }
773 if (r < 0) {
774 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
775 << " _update_meta failed: update=" << u << " r=" << r
776 << " tid=" << tid << dendl;
777 return r;
778 }
779 }
780 if (canceled) {
781 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
782 << " canceled too many times, giving up: tid=" << tid << dendl;
783 return -ECANCELED;
784 }
785 r = process_journal(dpp, tid, y);
786 if (r < 0) {
787 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
788 << " process_journal failed: r=" << r << " tid=" << tid << dendl;
789 }
790 return r;
791 }
792
793 int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp,
794 std::int64_t new_head_part_num,
795 std::uint64_t tid, optional_yield y)
796 {
797 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
798 << " entering: tid=" << tid << dendl;
799 std::unique_lock l(m);
800 auto max_push_part_num = info.max_push_part_num;
801 auto version = info.version;
802 l.unlock();
803
804 int r = 0;
805 if (max_push_part_num < new_head_part_num) {
806 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
807 << " need new part: tid=" << tid << dendl;
808 r = _prepare_new_part(dpp, new_head_part_num, true, tid, y);
809 if (r < 0) {
810 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
811 << " _prepare_new_part failed: r=" << r
812 << " tid=" << tid << dendl;
813 return r;
814 }
815 std::unique_lock l(m);
816 if (info.max_push_part_num < new_head_part_num) {
817 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
818 << " inconsistency, push part less than head part: "
819 << " tid=" << tid << dendl;
820 return -EIO;
821 }
822 l.unlock();
823 return 0;
824 }
825
826 using enum fifo::journal_entry::Op;
827 fifo::journal_entry jentry;
828 jentry.op = set_head;
829 jentry.part_num = new_head_part_num;
830
831 r = 0;
832 bool canceled = true;
833 for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
834 canceled = false;
835 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
836 << " updating metadata: i=" << i << " tid=" << tid << dendl;
837 auto u = fifo::update{}.journal_entries_add({{ jentry }});
838 r = _update_meta(dpp, u, version, &canceled, tid, y);
839 if (r >= 0 && canceled) {
840 std::unique_lock l(m);
841 auto found = (info.journal.contains({create, new_head_part_num}) ||
842 info.journal.contains({set_head, new_head_part_num}));
843 version = info.version;
844 if ((info.head_part_num >= new_head_part_num)) {
845 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
846 << " raced, but journaled and processed: i=" << i
847 << " tid=" << tid << dendl;
848 return 0;
849 }
850 if (found) {
851 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
852 << " raced, journaled but not processed: i=" << i
853 << " tid=" << tid << dendl;
854 canceled = false;
855 }
856 l.unlock();
857 }
858 if (r < 0) {
859 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
860 << " _update_meta failed: update=" << u << " r=" << r
861 << " tid=" << tid << dendl;
862 return r;
863 }
864 }
865 if (canceled) {
866 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
867 << " canceled too many times, giving up: tid=" << tid << dendl;
868 return -ECANCELED;
869 }
870 r = process_journal(dpp, tid, y);
871 if (r < 0) {
872 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
873 << " process_journal failed: r=" << r << " tid=" << tid << dendl;
874 }
875 return r;
876 }
877
878 struct NewPartPreparer : public Completion<NewPartPreparer> {
879 FIFO* f;
880 std::vector<fifo::journal_entry> jentries;
881 int i = 0;
882 std::int64_t new_part_num;
883 bool canceled = false;
884 uint64_t tid;
885
886 NewPartPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
887 std::vector<fifo::journal_entry> jentries,
888 std::int64_t new_part_num,
889 std::uint64_t tid)
890 : Completion(dpp, super), f(f), jentries(std::move(jentries)),
891 new_part_num(new_part_num), tid(tid) {}
892
893 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
894 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
895 << " entering: tid=" << tid << dendl;
896 if (r < 0) {
897 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
898 << " _update_meta failed: r=" << r
899 << " tid=" << tid << dendl;
900 complete(std::move(p), r);
901 return;
902 }
903
904 if (canceled) {
905 using enum fifo::journal_entry::Op;
906 std::unique_lock l(f->m);
907 auto found = (f->info.journal.contains({create, new_part_num}) ||
908 f->info.journal.contains({set_head, new_part_num}));
909 auto max_push_part_num = f->info.max_push_part_num;
910 auto head_part_num = f->info.head_part_num;
911 auto version = f->info.version;
912 l.unlock();
913 if ((max_push_part_num >= new_part_num &&
914 head_part_num >= new_part_num)) {
915 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
916 << " raced, but journaled and processed: i=" << i
917 << " tid=" << tid << dendl;
918 complete(std::move(p), 0);
919 return;
920 }
921 if (i >= MAX_RACE_RETRIES) {
922 complete(std::move(p), -ECANCELED);
923 return;
924 }
925 if (!found) {
926 ++i;
927 f->_update_meta(dpp, fifo::update{}
928 .journal_entries_add(jentries),
929 version, &canceled, tid, call(std::move(p)));
930 return;
931 } else {
932 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
933 << " raced, journaled but not processed: i=" << i
934 << " tid=" << tid << dendl;
935 canceled = false;
936 }
937 // Fall through. We still need to process the journal.
938 }
939 f->process_journal(dpp, tid, super());
940 return;
941 }
942 };
943
944 void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, std::int64_t new_part_num,
945 bool is_head, std::uint64_t tid, lr::AioCompletion* c)
946 {
947 std::unique_lock l(m);
948 using enum fifo::journal_entry::Op;
949 std::vector<fifo::journal_entry> jentries{{create, new_part_num}};
950 if (info.journal.contains({create, new_part_num}) &&
951 (!is_head || info.journal.contains({set_head, new_part_num}))) {
952 l.unlock();
953 ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
954 << " new part journaled, but not processed: tid="
955 << tid << dendl;
956 process_journal(dpp, tid, c);
957 return;
958 }
959 auto version = info.version;
960
961 if (is_head) {
962 jentries.push_back({ set_head, new_part_num });
963 }
964 l.unlock();
965
966 auto n = std::make_unique<NewPartPreparer>(dpp, this, c, jentries,
967 new_part_num, tid);
968 auto np = n.get();
969 _update_meta(dpp, fifo::update{}.journal_entries_add(jentries), version,
970 &np->canceled, tid, NewPartPreparer::call(std::move(n)));
971 }
972
973 struct NewHeadPreparer : public Completion<NewHeadPreparer> {
974 FIFO* f;
975 int i = 0;
976 bool newpart;
977 std::int64_t new_head_part_num;
978 bool canceled = false;
979 std::uint64_t tid;
980
981 NewHeadPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
982 bool newpart, std::int64_t new_head_part_num,
983 std::uint64_t tid)
984 : Completion(dpp, super), f(f), newpart(newpart),
985 new_head_part_num(new_head_part_num), tid(tid) {}
986
987 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
988 if (newpart)
989 handle_newpart(std::move(p), r);
990 else
991 handle_update(dpp, std::move(p), r);
992 }
993
994 void handle_newpart(Ptr&& p, int r) {
995 if (r < 0) {
996 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
997 << " _prepare_new_part failed: r=" << r
998 << " tid=" << tid << dendl;
999 complete(std::move(p), r);
1000 return;
1001 }
1002 std::unique_lock l(f->m);
1003 if (f->info.max_push_part_num < new_head_part_num) {
1004 l.unlock();
1005 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1006 << " _prepare_new_part failed: r=" << r
1007 << " tid=" << tid << dendl;
1008 complete(std::move(p), -EIO);
1009 } else {
1010 l.unlock();
1011 complete(std::move(p), 0);
1012 }
1013 }
1014
1015 void handle_update(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
1016 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1017 << " entering: tid=" << tid << dendl;
1018 if (r < 0) {
1019 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1020 << " _update_meta failed: r=" << r
1021 << " tid=" << tid << dendl;
1022 complete(std::move(p), r);
1023 return;
1024 }
1025
1026 if (canceled) {
1027 using enum fifo::journal_entry::Op;
1028 std::unique_lock l(f->m);
1029 auto found = (f->info.journal.contains({create, new_head_part_num }) ||
1030 f->info.journal.contains({set_head, new_head_part_num }));
1031 auto head_part_num = f->info.head_part_num;
1032 auto version = f->info.version;
1033
1034 l.unlock();
1035 if ((head_part_num >= new_head_part_num)) {
1036 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1037 << " raced, but journaled and processed: i=" << i
1038 << " tid=" << tid << dendl;
1039 complete(std::move(p), 0);
1040 return;
1041 }
1042 if (i >= MAX_RACE_RETRIES) {
1043 complete(std::move(p), -ECANCELED);
1044 return;
1045 }
1046 if (!found) {
1047 ++i;
1048 fifo::journal_entry jentry;
1049 jentry.op = set_head;
1050 jentry.part_num = new_head_part_num;
1051 f->_update_meta(dpp, fifo::update{}
1052 .journal_entries_add({{jentry}}),
1053 version, &canceled, tid, call(std::move(p)));
1054 return;
1055 } else {
1056 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1057 << " raced, journaled but not processed: i=" << i
1058 << " tid=" << tid << dendl;
1059 canceled = false;
1060 }
1061 // Fall through. We still need to process the journal.
1062 }
1063 f->process_journal(dpp, tid, super());
1064 return;
1065 }
1066 };
1067
1068 void FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::int64_t new_head_part_num,
1069 std::uint64_t tid, lr::AioCompletion* c)
1070 {
1071 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1072 << " entering: tid=" << tid << dendl;
1073 std::unique_lock l(m);
1074 auto max_push_part_num = info.max_push_part_num;
1075 auto version = info.version;
1076 l.unlock();
1077
1078 if (max_push_part_num < new_head_part_num) {
1079 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1080 << " need new part: tid=" << tid << dendl;
1081 auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, true, new_head_part_num,
1082 tid);
1083 _prepare_new_part(dpp, new_head_part_num, true, tid,
1084 NewHeadPreparer::call(std::move(n)));
1085 } else {
1086 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1087 << " updating head: tid=" << tid << dendl;
1088 auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, false, new_head_part_num,
1089 tid);
1090 auto np = n.get();
1091 using enum fifo::journal_entry::Op;
1092 fifo::journal_entry jentry;
1093 jentry.op = set_head;
1094 jentry.part_num = new_head_part_num;
1095 _update_meta(dpp, fifo::update{}.journal_entries_add({{jentry}}), version,
1096 &np->canceled, tid, NewHeadPreparer::call(std::move(n)));
1097 }
1098 }
1099
1100 int FIFO::push_entries(const DoutPrefixProvider *dpp, const std::deque<cb::list>& data_bufs,
1101 std::uint64_t tid, optional_yield y)
1102 {
1103 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1104 << " entering: tid=" << tid << dendl;
1105 std::unique_lock l(m);
1106 auto head_part_num = info.head_part_num;
1107 const auto part_oid = info.part_oid(head_part_num);
1108 l.unlock();
1109
1110 auto r = push_part(dpp, ioctx, part_oid, data_bufs, tid, y);
1111 if (r < 0) {
1112 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1113 << " push_part failed: r=" << r << " tid=" << tid << dendl;
1114 }
1115 return r;
1116 }
1117
1118 void FIFO::push_entries(const std::deque<cb::list>& data_bufs,
1119 std::uint64_t tid, lr::AioCompletion* c)
1120 {
1121 std::unique_lock l(m);
1122 auto head_part_num = info.head_part_num;
1123 const auto part_oid = info.part_oid(head_part_num);
1124 l.unlock();
1125
1126 push_part(ioctx, part_oid, data_bufs, tid, c);
1127 }
1128
1129 int FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
1130 bool exclusive, std::uint64_t tid,
1131 optional_yield y)
1132 {
1133 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1134 << " entering: tid=" << tid << dendl;
1135 lr::ObjectWriteOperation op;
1136 std::unique_lock l(m);
1137 const auto part_oid = info.part_oid(part_num);
1138 l.unlock();
1139 rgw::cls::fifo::trim_part(&op, ofs, exclusive);
1140 auto r = rgw_rados_operate(dpp, ioctx, part_oid, &op, y);
1141 if (r < 0) {
1142 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1143 << " trim_part failed: r=" << r << " tid=" << tid << dendl;
1144 }
1145 return 0;
1146 }
1147
1148 void FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
1149 bool exclusive, std::uint64_t tid,
1150 lr::AioCompletion* c)
1151 {
1152 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1153 << " entering: tid=" << tid << dendl;
1154 lr::ObjectWriteOperation op;
1155 std::unique_lock l(m);
1156 const auto part_oid = info.part_oid(part_num);
1157 l.unlock();
1158 rgw::cls::fifo::trim_part(&op, ofs, exclusive);
1159 auto r = ioctx.aio_operate(part_oid, c, &op);
1160 ceph_assert(r >= 0);
1161 }
1162
1163 int FIFO::open(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, std::string oid, std::unique_ptr<FIFO>* fifo,
1164 optional_yield y, std::optional<fifo::objv> objv,
1165 bool probe)
1166 {
1167 ldpp_dout(dpp, 20)
1168 << __PRETTY_FUNCTION__ << ":" << __LINE__
1169 << " entering" << dendl;
1170 fifo::info info;
1171 std::uint32_t size;
1172 std::uint32_t over;
1173 int r = get_meta(dpp, ioctx, std::move(oid), objv, &info, &size, &over, 0, y,
1174 probe);
1175 if (r < 0) {
1176 if (!(probe && (r == -ENOENT || r == -ENODATA))) {
1177 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1178 << " get_meta failed: r=" << r << dendl;
1179 }
1180 return r;
1181 }
1182 std::unique_ptr<FIFO> f(new FIFO(std::move(ioctx), oid));
1183 f->info = info;
1184 f->part_header_size = size;
1185 f->part_entry_overhead = over;
1186 // If there are journal entries, process them, in case
1187 // someone crashed mid-transaction.
1188 if (!info.journal.empty()) {
1189 ldpp_dout(dpp, 20)
1190 << __PRETTY_FUNCTION__ << ":" << __LINE__
1191 << " processing leftover journal" << dendl;
1192 r = f->process_journal(dpp, 0, y);
1193 if (r < 0) {
1194 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1195 << " process_journal failed: r=" << r << dendl;
1196 return r;
1197 }
1198 }
1199 *fifo = std::move(f);
1200 return 0;
1201 }
1202
1203 int FIFO::create(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, std::string oid, std::unique_ptr<FIFO>* fifo,
1204 optional_yield y, std::optional<fifo::objv> objv,
1205 std::optional<std::string_view> oid_prefix,
1206 bool exclusive, std::uint64_t max_part_size,
1207 std::uint64_t max_entry_size)
1208 {
1209 ldpp_dout(dpp, 20)
1210 << __PRETTY_FUNCTION__ << ":" << __LINE__
1211 << " entering" << dendl;
1212 lr::ObjectWriteOperation op;
1213 create_meta(&op, oid, objv, oid_prefix, exclusive, max_part_size,
1214 max_entry_size);
1215 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
1216 if (r < 0) {
1217 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1218 << " create_meta failed: r=" << r << dendl;
1219 return r;
1220 }
1221 r = open(dpp, std::move(ioctx), std::move(oid), fifo, y, objv);
1222 return r;
1223 }
1224
1225 int FIFO::read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y) {
1226 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1227 << " entering: tid=" << tid << dendl;
1228 fifo::info _info;
1229 std::uint32_t _phs;
1230 std::uint32_t _peo;
1231
1232 auto r = get_meta(dpp, ioctx, oid, std::nullopt, &_info, &_phs, &_peo, tid, y);
1233 if (r < 0) {
1234 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1235 << " get_meta failed: r=" << r << " tid=" << tid << dendl;
1236 return r;
1237 }
1238 std::unique_lock l(m);
1239 // We have a newer version already!
1240 if (_info.version.same_or_later(this->info.version)) {
1241 info = std::move(_info);
1242 part_header_size = _phs;
1243 part_entry_overhead = _peo;
1244 }
1245 return 0;
1246 }
1247
1248 int FIFO::read_meta(const DoutPrefixProvider *dpp, optional_yield y) {
1249 std::unique_lock l(m);
1250 auto tid = ++next_tid;
1251 l.unlock();
1252 return read_meta(dpp, tid, y);
1253 }
1254
1255 struct Reader : public Completion<Reader> {
1256 FIFO* fifo;
1257 cb::list bl;
1258 std::uint64_t tid;
1259 Reader(const DoutPrefixProvider *dpp, FIFO* fifo, lr::AioCompletion* super, std::uint64_t tid)
1260 : Completion(dpp, super), fifo(fifo), tid(tid) {}
1261
1262 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
1263 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1264 << " entering: tid=" << tid << dendl;
1265 if (r >= 0) try {
1266 fifo::op::get_meta_reply reply;
1267 auto iter = bl.cbegin();
1268 decode(reply, iter);
1269 std::unique_lock l(fifo->m);
1270 if (reply.info.version.same_or_later(fifo->info.version)) {
1271 fifo->info = std::move(reply.info);
1272 fifo->part_header_size = reply.part_header_size;
1273 fifo->part_entry_overhead = reply.part_entry_overhead;
1274 }
1275 } catch (const cb::error& err) {
1276 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1277 << " failed to decode response err=" << err.what()
1278 << " tid=" << tid << dendl;
1279 r = from_error_code(err.code());
1280 } else {
1281 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1282 << " read_meta failed r=" << r
1283 << " tid=" << tid << dendl;
1284 }
1285 complete(std::move(p), r);
1286 }
1287 };
1288
1289 void FIFO::read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c)
1290 {
1291 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1292 << " entering: tid=" << tid << dendl;
1293 lr::ObjectReadOperation op;
1294 fifo::op::get_meta gm;
1295 cb::list in;
1296 encode(gm, in);
1297 auto reader = std::make_unique<Reader>(dpp, this, c, tid);
1298 auto rp = reader.get();
1299 auto r = ioctx.aio_exec(oid, Reader::call(std::move(reader)), fifo::op::CLASS,
1300 fifo::op::GET_META, in, &rp->bl);
1301 assert(r >= 0);
1302 }
1303
1304 const fifo::info& FIFO::meta() const {
1305 return info;
1306 }
1307
1308 std::pair<std::uint32_t, std::uint32_t> FIFO::get_part_layout_info() const {
1309 return {part_header_size, part_entry_overhead};
1310 }
1311
1312 int FIFO::push(const DoutPrefixProvider *dpp, const cb::list& bl, optional_yield y) {
1313 return push(dpp, std::vector{ bl }, y);
1314 }
1315
1316 void FIFO::push(const DoutPrefixProvider *dpp, const cb::list& bl, lr::AioCompletion* c) {
1317 push(dpp, std::vector{ bl }, c);
1318 }
1319
1320 int FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_bufs, optional_yield y)
1321 {
1322 std::unique_lock l(m);
1323 auto tid = ++next_tid;
1324 auto max_entry_size = info.params.max_entry_size;
1325 auto need_new_head = info.need_new_head();
1326 auto head_part_num = info.head_part_num;
1327 l.unlock();
1328 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1329 << " entering: tid=" << tid << dendl;
1330 if (data_bufs.empty()) {
1331 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1332 << " empty push, returning success tid=" << tid << dendl;
1333 return 0;
1334 }
1335
1336 // Validate sizes
1337 for (const auto& bl : data_bufs) {
1338 if (bl.length() > max_entry_size) {
1339 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1340 << " entry bigger than max_entry_size tid=" << tid << dendl;
1341 return -E2BIG;
1342 }
1343 }
1344
1345 int r = 0;
1346 if (need_new_head) {
1347 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1348 << " need new head tid=" << tid << dendl;
1349 r = _prepare_new_head(dpp, head_part_num + 1, tid, y);
1350 if (r < 0) {
1351 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1352 << " _prepare_new_head failed: r=" << r
1353 << " tid=" << tid << dendl;
1354 return r;
1355 }
1356 }
1357
1358 std::deque<cb::list> remaining(data_bufs.begin(), data_bufs.end());
1359 std::deque<cb::list> batch;
1360
1361 uint64_t batch_len = 0;
1362 auto retries = 0;
1363 bool canceled = true;
1364 while ((!remaining.empty() || !batch.empty()) &&
1365 (retries <= MAX_RACE_RETRIES)) {
1366 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1367 << " preparing push: remaining=" << remaining.size()
1368 << " batch=" << batch.size() << " retries=" << retries
1369 << " tid=" << tid << dendl;
1370 std::unique_lock l(m);
1371 head_part_num = info.head_part_num;
1372 auto max_part_size = info.params.max_part_size;
1373 auto overhead = part_entry_overhead;
1374 l.unlock();
1375
1376 while (!remaining.empty() &&
1377 (remaining.front().length() + batch_len <= max_part_size)) {
1378 /* We can send entries with data_len up to max_entry_size,
1379 however, we want to also account the overhead when
1380 dealing with multiple entries. Previous check doesn't
1381 account for overhead on purpose. */
1382 batch_len += remaining.front().length() + overhead;
1383 batch.push_back(std::move(remaining.front()));
1384 remaining.pop_front();
1385 }
1386 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1387 << " prepared push: remaining=" << remaining.size()
1388 << " batch=" << batch.size() << " retries=" << retries
1389 << " batch_len=" << batch_len
1390 << " tid=" << tid << dendl;
1391
1392 auto r = push_entries(dpp, batch, tid, y);
1393 if (r == -ERANGE) {
1394 canceled = true;
1395 ++retries;
1396 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1397 << " need new head tid=" << tid << dendl;
1398 r = _prepare_new_head(dpp, head_part_num + 1, tid, y);
1399 if (r < 0) {
1400 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1401 << " prepare_new_head failed: r=" << r
1402 << " tid=" << tid << dendl;
1403 return r;
1404 }
1405 r = 0;
1406 continue;
1407 }
1408 if (r == -ENOENT) {
1409 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1410 << " racing client trimmed part, rereading metadata "
1411 << "tid=" << tid << dendl;
1412 canceled = true;
1413 ++retries;
1414 r = read_meta(dpp, y);
1415 if (r < 0) {
1416 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1417 << " read_meta failed: r=" << r
1418 << " tid=" << tid << dendl;
1419 return r;
1420 }
1421 r = 0;
1422 continue;
1423 }
1424 if (r < 0) {
1425 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1426 << " push_entries failed: r=" << r
1427 << " tid=" << tid << dendl;
1428 return r;
1429 }
1430 // Made forward progress!
1431 canceled = false;
1432 retries = 0;
1433 batch_len = 0;
1434 if (r == ssize(batch)) {
1435 batch.clear();
1436 } else {
1437 batch.erase(batch.begin(), batch.begin() + r);
1438 for (const auto& b : batch) {
1439 batch_len += b.length() + part_entry_overhead;
1440 }
1441 }
1442 }
1443 if (canceled) {
1444 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1445 << " canceled too many times, giving up: tid=" << tid << dendl;
1446 return -ECANCELED;
1447 }
1448 return 0;
1449 }
1450
1451 struct Pusher : public Completion<Pusher> {
1452 FIFO* f;
1453 std::deque<cb::list> remaining;
1454 std::deque<cb::list> batch;
1455 int i = 0;
1456 std::int64_t head_part_num;
1457 std::uint64_t tid;
1458 enum { pushing, new_heading, meta_reading } state = pushing;
1459
1460 void prep_then_push(const DoutPrefixProvider *dpp, Ptr&& p, const unsigned successes) {
1461 std::unique_lock l(f->m);
1462 auto max_part_size = f->info.params.max_part_size;
1463 auto part_entry_overhead = f->part_entry_overhead;
1464 head_part_num = f->info.head_part_num;
1465 l.unlock();
1466
1467 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1468 << " preparing push: remaining=" << remaining.size()
1469 << " batch=" << batch.size() << " i=" << i
1470 << " tid=" << tid << dendl;
1471
1472 uint64_t batch_len = 0;
1473 if (successes > 0) {
1474 if (successes == batch.size()) {
1475 batch.clear();
1476 } else {
1477 batch.erase(batch.begin(), batch.begin() + successes);
1478 for (const auto& b : batch) {
1479 batch_len += b.length() + part_entry_overhead;
1480 }
1481 }
1482 }
1483
1484 if (batch.empty() && remaining.empty()) {
1485 complete(std::move(p), 0);
1486 return;
1487 }
1488
1489 while (!remaining.empty() &&
1490 (remaining.front().length() + batch_len <= max_part_size)) {
1491
1492 /* We can send entries with data_len up to max_entry_size,
1493 however, we want to also account the overhead when
1494 dealing with multiple entries. Previous check doesn't
1495 account for overhead on purpose. */
1496 batch_len += remaining.front().length() + part_entry_overhead;
1497 batch.push_back(std::move(remaining.front()));
1498 remaining.pop_front();
1499 }
1500 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1501 << " prepared push: remaining=" << remaining.size()
1502 << " batch=" << batch.size() << " i=" << i
1503 << " batch_len=" << batch_len
1504 << " tid=" << tid << dendl;
1505 push(std::move(p));
1506 }
1507
1508 void push(Ptr&& p) {
1509 f->push_entries(batch, tid, call(std::move(p)));
1510 }
1511
1512 void new_head(const DoutPrefixProvider *dpp, Ptr&& p) {
1513 state = new_heading;
1514 f->_prepare_new_head(dpp, head_part_num + 1, tid, call(std::move(p)));
1515 }
1516
1517 void read_meta(const DoutPrefixProvider *dpp, Ptr&& p) {
1518 ++i;
1519 state = meta_reading;
1520 f->read_meta(dpp, tid, call(std::move(p)));
1521 }
1522
1523 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
1524 switch (state) {
1525 case pushing:
1526 if (r == -ERANGE) {
1527 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1528 << " need new head tid=" << tid << dendl;
1529 new_head(dpp, std::move(p));
1530 return;
1531 }
1532 if (r == -ENOENT) {
1533 if (i > MAX_RACE_RETRIES) {
1534 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1535 << " racing client deleted part, but we're out"
1536 << " of retries: tid=" << tid << dendl;
1537 complete(std::move(p), r);
1538 }
1539 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1540 << " racing client deleted part: tid=" << tid << dendl;
1541 read_meta(dpp, std::move(p));
1542 return;
1543 }
1544 if (r < 0) {
1545 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1546 << " push_entries failed: r=" << r
1547 << " tid=" << tid << dendl;
1548 complete(std::move(p), r);
1549 return;
1550 }
1551 i = 0; // We've made forward progress, so reset the race counter!
1552 prep_then_push(dpp, std::move(p), r);
1553 break;
1554
1555 case new_heading:
1556 if (r < 0) {
1557 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1558 << " prepare_new_head failed: r=" << r
1559 << " tid=" << tid << dendl;
1560 complete(std::move(p), r);
1561 return;
1562 }
1563 state = pushing;
1564 handle_new_head(dpp, std::move(p), r);
1565 break;
1566
1567 case meta_reading:
1568 if (r < 0) {
1569 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1570 << " read_meta failed: r=" << r
1571 << " tid=" << tid << dendl;
1572 complete(std::move(p), r);
1573 return;
1574 }
1575 state = pushing;
1576 prep_then_push(dpp, std::move(p), r);
1577 break;
1578 }
1579 }
1580
1581 void handle_new_head(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
1582 if (r == -ECANCELED) {
1583 if (p->i == MAX_RACE_RETRIES) {
1584 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1585 << " canceled too many times, giving up: tid=" << tid << dendl;
1586 complete(std::move(p), -ECANCELED);
1587 return;
1588 }
1589 ++p->i;
1590 } else if (r) {
1591 complete(std::move(p), r);
1592 return;
1593 }
1594
1595 if (p->batch.empty()) {
1596 prep_then_push(dpp, std::move(p), 0);
1597 return;
1598 } else {
1599 push(std::move(p));
1600 return;
1601 }
1602 }
1603
1604 Pusher(const DoutPrefixProvider *dpp, FIFO* f, std::deque<cb::list>&& remaining,
1605 std::int64_t head_part_num, std::uint64_t tid,
1606 lr::AioCompletion* super)
1607 : Completion(dpp, super), f(f), remaining(std::move(remaining)),
1608 head_part_num(head_part_num), tid(tid) {}
1609 };
1610
1611 void FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_bufs,
1612 lr::AioCompletion* c)
1613 {
1614 std::unique_lock l(m);
1615 auto tid = ++next_tid;
1616 auto max_entry_size = info.params.max_entry_size;
1617 auto need_new_head = info.need_new_head();
1618 auto head_part_num = info.head_part_num;
1619 l.unlock();
1620 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1621 << " entering: tid=" << tid << dendl;
1622 auto p = std::make_unique<Pusher>(dpp, this, std::deque<cb::list>(data_bufs.begin(), data_bufs.end()),
1623 head_part_num, tid, c);
1624 // Validate sizes
1625 for (const auto& bl : data_bufs) {
1626 if (bl.length() > max_entry_size) {
1627 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1628 << " entry bigger than max_entry_size tid=" << tid << dendl;
1629 Pusher::complete(std::move(p), -E2BIG);
1630 return;
1631 }
1632 }
1633
1634 if (data_bufs.empty() ) {
1635 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1636 << " empty push, returning success tid=" << tid << dendl;
1637 Pusher::complete(std::move(p), 0);
1638 return;
1639 }
1640
1641 if (need_new_head) {
1642 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1643 << " need new head tid=" << tid << dendl;
1644 p->new_head(dpp, std::move(p));
1645 } else {
1646 p->prep_then_push(dpp, std::move(p), 0);
1647 }
1648 }
1649
1650 int FIFO::list(const DoutPrefixProvider *dpp, int max_entries,
1651 std::optional<std::string_view> markstr,
1652 std::vector<list_entry>* presult, bool* pmore,
1653 optional_yield y)
1654 {
1655 std::unique_lock l(m);
1656 auto tid = ++next_tid;
1657 std::int64_t part_num = info.tail_part_num;
1658 l.unlock();
1659 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1660 << " entering: tid=" << tid << dendl;
1661 std::uint64_t ofs = 0;
1662 if (markstr) {
1663 auto marker = to_marker(*markstr);
1664 if (!marker) {
1665 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1666 << " invalid marker string: " << markstr
1667 << " tid= "<< tid << dendl;
1668 return -EINVAL;
1669 }
1670 part_num = marker->num;
1671 ofs = marker->ofs;
1672 }
1673
1674 std::vector<list_entry> result;
1675 result.reserve(max_entries);
1676 bool more = false;
1677
1678 std::vector<fifo::part_list_entry> entries;
1679 int r = 0;
1680 while (max_entries > 0) {
1681 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1682 << " max_entries=" << max_entries << " tid=" << tid << dendl;
1683 bool part_more = false;
1684 bool part_full = false;
1685
1686 std::unique_lock l(m);
1687 auto part_oid = info.part_oid(part_num);
1688 l.unlock();
1689
1690 r = list_part(dpp, ioctx, part_oid, ofs, max_entries, &entries,
1691 &part_more, &part_full, tid, y);
1692 if (r == -ENOENT) {
1693 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1694 << " missing part, rereading metadata"
1695 << " tid= "<< tid << dendl;
1696 r = read_meta(dpp, tid, y);
1697 if (r < 0) {
1698 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1699 << " read_meta failed: r=" << r
1700 << " tid= "<< tid << dendl;
1701 return r;
1702 }
1703 if (part_num < info.tail_part_num) {
1704 /* raced with trim? restart */
1705 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1706 << " raced with trim, restarting: tid=" << tid << dendl;
1707 max_entries += result.size();
1708 result.clear();
1709 std::unique_lock l(m);
1710 part_num = info.tail_part_num;
1711 l.unlock();
1712 ofs = 0;
1713 continue;
1714 }
1715 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1716 << " assuming part was not written yet, so end of data: "
1717 << "tid=" << tid << dendl;
1718 more = false;
1719 r = 0;
1720 break;
1721 }
1722 if (r < 0) {
1723 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1724 << " list_entries failed: r=" << r
1725 << " tid= "<< tid << dendl;
1726 return r;
1727 }
1728 more = part_full || part_more;
1729 for (auto& entry : entries) {
1730 list_entry e;
1731 e.data = std::move(entry.data);
1732 e.marker = marker{part_num, entry.ofs}.to_string();
1733 e.mtime = entry.mtime;
1734 result.push_back(std::move(e));
1735 --max_entries;
1736 if (max_entries == 0)
1737 break;
1738 }
1739 entries.clear();
1740 if (max_entries > 0 &&
1741 part_more) {
1742 }
1743
1744 if (!part_full) {
1745 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1746 << " head part is not full, so we can assume we're done: "
1747 << "tid=" << tid << dendl;
1748 break;
1749 }
1750 if (!part_more) {
1751 ++part_num;
1752 ofs = 0;
1753 }
1754 }
1755 if (presult)
1756 *presult = std::move(result);
1757 if (pmore)
1758 *pmore = more;
1759 return 0;
1760 }
1761
1762 int FIFO::trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool exclusive, optional_yield y)
1763 {
1764 bool overshoot = false;
1765 auto marker = to_marker(markstr);
1766 if (!marker) {
1767 return -EINVAL;
1768 }
1769 auto part_num = marker->num;
1770 auto ofs = marker->ofs;
1771 std::unique_lock l(m);
1772 auto tid = ++next_tid;
1773 auto hn = info.head_part_num;
1774 const auto max_part_size = info.params.max_part_size;
1775 if (part_num > hn) {
1776 l.unlock();
1777 auto r = read_meta(dpp, tid, y);
1778 if (r < 0) {
1779 return r;
1780 }
1781 l.lock();
1782 auto hn = info.head_part_num;
1783 if (part_num > hn) {
1784 overshoot = true;
1785 part_num = hn;
1786 ofs = max_part_size;
1787 }
1788 }
1789 if (part_num < info.tail_part_num) {
1790 return -ENODATA;
1791 }
1792 auto pn = info.tail_part_num;
1793 l.unlock();
1794 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1795 << " entering: tid=" << tid << dendl;
1796
1797 int r = 0;
1798 while (pn < part_num) {
1799 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1800 << " pn=" << pn << " tid=" << tid << dendl;
1801 std::unique_lock l(m);
1802 l.unlock();
1803 r = trim_part(dpp, pn, max_part_size, false, tid, y);
1804 if (r < 0 && r == -ENOENT) {
1805 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1806 << " trim_part failed: r=" << r
1807 << " tid= "<< tid << dendl;
1808 return r;
1809 }
1810 ++pn;
1811 }
1812 r = trim_part(dpp, part_num, ofs, exclusive, tid, y);
1813 if (r < 0 && r != -ENOENT) {
1814 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1815 << " trim_part failed: r=" << r
1816 << " tid= "<< tid << dendl;
1817 return r;
1818 }
1819
1820 l.lock();
1821 auto tail_part_num = info.tail_part_num;
1822 auto objv = info.version;
1823 l.unlock();
1824 bool canceled = tail_part_num < part_num;
1825 int retries = 0;
1826 while ((tail_part_num < part_num) &&
1827 canceled &&
1828 (retries <= MAX_RACE_RETRIES)) {
1829 r = _update_meta(dpp, fifo::update{}.tail_part_num(part_num), objv, &canceled,
1830 tid, y);
1831 if (r < 0) {
1832 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1833 << " _update_meta failed: r=" << r
1834 << " tid= "<< tid << dendl;
1835 return r;
1836 }
1837 if (canceled) {
1838 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1839 << " canceled: retries=" << retries
1840 << " tid=" << tid << dendl;
1841 l.lock();
1842 tail_part_num = info.tail_part_num;
1843 objv = info.version;
1844 l.unlock();
1845 ++retries;
1846 }
1847 }
1848 if (canceled) {
1849 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1850 << " canceled too many times, giving up: tid=" << tid << dendl;
1851 return -EIO;
1852 }
1853 return overshoot ? -ENODATA : 0;
1854 }
1855
1856 struct Trimmer : public Completion<Trimmer> {
1857 FIFO* fifo;
1858 std::int64_t part_num;
1859 std::uint64_t ofs;
1860 std::int64_t pn;
1861 bool exclusive;
1862 std::uint64_t tid;
1863 bool update = false;
1864 bool reread = false;
1865 bool canceled = false;
1866 bool overshoot = false;
1867 int retries = 0;
1868
1869 Trimmer(const DoutPrefixProvider *dpp, FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn,
1870 bool exclusive, lr::AioCompletion* super, std::uint64_t tid)
1871 : Completion(dpp, super), fifo(fifo), part_num(part_num), ofs(ofs), pn(pn),
1872 exclusive(exclusive), tid(tid) {}
1873
1874 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
1875 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1876 << " entering: tid=" << tid << dendl;
1877
1878 if (reread) {
1879 reread = false;
1880 if (r < 0) {
1881 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1882 << " read_meta failed: r="
1883 << r << " tid=" << tid << dendl;
1884 complete(std::move(p), r);
1885 return;
1886 }
1887 std::unique_lock l(fifo->m);
1888 auto hn = fifo->info.head_part_num;
1889 const auto max_part_size = fifo->info.params.max_part_size;
1890 const auto tail_part_num = fifo->info.tail_part_num;
1891 l.unlock();
1892 if (part_num > hn) {
1893 part_num = hn;
1894 ofs = max_part_size;
1895 overshoot = true;
1896 }
1897 if (part_num < tail_part_num) {
1898 complete(std::move(p), -ENODATA);
1899 return;
1900 }
1901 pn = tail_part_num;
1902 if (pn < part_num) {
1903 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1904 << " pn=" << pn << " tid=" << tid << dendl;
1905 fifo->trim_part(dpp, pn++, max_part_size, false, tid,
1906 call(std::move(p)));
1907 } else {
1908 update = true;
1909 canceled = tail_part_num < part_num;
1910 fifo->trim_part(dpp, part_num, ofs, exclusive, tid, call(std::move(p)));
1911 }
1912 return;
1913 }
1914
1915 if (r == -ENOENT) {
1916 r = 0;
1917 }
1918
1919 if (r < 0) {
1920 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1921 << (update ? " update_meta " : " trim ") << "failed: r="
1922 << r << " tid=" << tid << dendl;
1923 complete(std::move(p), r);
1924 return;
1925 }
1926
1927 if (!update) {
1928 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1929 << " handling preceding trim callback: tid=" << tid << dendl;
1930 retries = 0;
1931 if (pn < part_num) {
1932 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1933 << " pn=" << pn << " tid=" << tid << dendl;
1934 std::unique_lock l(fifo->m);
1935 const auto max_part_size = fifo->info.params.max_part_size;
1936 l.unlock();
1937 fifo->trim_part(dpp, pn++, max_part_size, false, tid,
1938 call(std::move(p)));
1939 return;
1940 }
1941
1942 std::unique_lock l(fifo->m);
1943 const auto tail_part_num = fifo->info.tail_part_num;
1944 l.unlock();
1945 update = true;
1946 canceled = tail_part_num < part_num;
1947 fifo->trim_part(dpp, part_num, ofs, exclusive, tid, call(std::move(p)));
1948 return;
1949 }
1950
1951 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1952 << " handling update-needed callback: tid=" << tid << dendl;
1953 std::unique_lock l(fifo->m);
1954 auto tail_part_num = fifo->info.tail_part_num;
1955 auto objv = fifo->info.version;
1956 l.unlock();
1957 if ((tail_part_num < part_num) &&
1958 canceled) {
1959 if (retries > MAX_RACE_RETRIES) {
1960 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1961 << " canceled too many times, giving up: tid=" << tid << dendl;
1962 complete(std::move(p), -EIO);
1963 return;
1964 }
1965 ++retries;
1966 fifo->_update_meta(dpp, fifo::update{}
1967 .tail_part_num(part_num), objv, &canceled,
1968 tid, call(std::move(p)));
1969 } else {
1970 complete(std::move(p), overshoot ? -ENODATA : 0);
1971 }
1972 }
1973 };
1974
1975 void FIFO::trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool exclusive,
1976 lr::AioCompletion* c) {
1977 auto marker = to_marker(markstr);
1978 auto realmark = marker.value_or(::rgw::cls::fifo::marker{});
1979 std::unique_lock l(m);
1980 const auto hn = info.head_part_num;
1981 const auto max_part_size = info.params.max_part_size;
1982 const auto pn = info.tail_part_num;
1983 const auto part_oid = info.part_oid(pn);
1984 auto tid = ++next_tid;
1985 l.unlock();
1986 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1987 << " entering: tid=" << tid << dendl;
1988 auto trimmer = std::make_unique<Trimmer>(dpp, this, realmark.num, realmark.ofs,
1989 pn, exclusive, c, tid);
1990 if (!marker) {
1991 Trimmer::complete(std::move(trimmer), -EINVAL);
1992 return;
1993 }
1994 ++trimmer->pn;
1995 auto ofs = marker->ofs;
1996 if (marker->num > hn) {
1997 trimmer->reread = true;
1998 read_meta(dpp, tid, Trimmer::call(std::move(trimmer)));
1999 return;
2000 }
2001 if (pn < marker->num) {
2002 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2003 << " pn=" << pn << " tid=" << tid << dendl;
2004 ofs = max_part_size;
2005 } else {
2006 trimmer->update = true;
2007 }
2008 trim_part(dpp, pn, ofs, exclusive, tid, Trimmer::call(std::move(trimmer)));
2009 }
2010
2011 int FIFO::get_part_info(const DoutPrefixProvider *dpp, int64_t part_num,
2012 fifo::part_header* header,
2013 optional_yield y)
2014 {
2015 std::unique_lock l(m);
2016 const auto part_oid = info.part_oid(part_num);
2017 auto tid = ++next_tid;
2018 l.unlock();
2019 auto r = rgw::cls::fifo::get_part_info(dpp, ioctx, part_oid, header, tid, y);
2020 if (r < 0) {
2021 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
2022 << " get_part_info failed: r="
2023 << r << " tid=" << tid << dendl;
2024 }
2025 return r;
2026 }
2027
2028 void FIFO::get_part_info(int64_t part_num,
2029 fifo::part_header* header,
2030 lr::AioCompletion* c)
2031 {
2032 std::unique_lock l(m);
2033 const auto part_oid = info.part_oid(part_num);
2034 auto tid = ++next_tid;
2035 l.unlock();
2036 auto op = rgw::cls::fifo::get_part_info(cct, header, tid);
2037 auto r = ioctx.aio_operate(part_oid, c, &op, nullptr);
2038 ceph_assert(r >= 0);
2039 }
2040
2041 struct InfoGetter : Completion<InfoGetter> {
2042 FIFO* fifo;
2043 fifo::part_header header;
2044 fu2::function<void(int r, fifo::part_header&&)> f;
2045 std::uint64_t tid;
2046 bool headerread = false;
2047
2048 InfoGetter(const DoutPrefixProvider *dpp, FIFO* fifo, fu2::function<void(int r, fifo::part_header&&)> f,
2049 std::uint64_t tid, lr::AioCompletion* super)
2050 : Completion(dpp, super), fifo(fifo), f(std::move(f)), tid(tid) {}
2051 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
2052 if (!headerread) {
2053 if (r < 0) {
2054 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
2055 << " read_meta failed: r="
2056 << r << " tid=" << tid << dendl;
2057 if (f)
2058 f(r, {});
2059 complete(std::move(p), r);
2060 return;
2061 }
2062
2063 auto info = fifo->meta();
2064 auto hpn = info.head_part_num;
2065 if (hpn < 0) {
2066 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2067 << " no head, returning empty partinfo r="
2068 << r << " tid=" << tid << dendl;
2069 if (f)
2070 f(0, {});
2071 complete(std::move(p), r);
2072 return;
2073 }
2074 headerread = true;
2075 auto op = rgw::cls::fifo::get_part_info(fifo->cct, &header, tid);
2076 std::unique_lock l(fifo->m);
2077 auto oid = fifo->info.part_oid(hpn);
2078 l.unlock();
2079 r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op,
2080 nullptr);
2081 ceph_assert(r >= 0);
2082 return;
2083 }
2084
2085 if (r < 0) {
2086 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
2087 << " get_part_info failed: r="
2088 << r << " tid=" << tid << dendl;
2089 }
2090
2091 if (f)
2092 f(r, std::move(header));
2093 complete(std::move(p), r);
2094 return;
2095 }
2096 };
2097
2098 void FIFO::get_head_info(const DoutPrefixProvider *dpp, fu2::unique_function<void(int r,
2099 fifo::part_header&&)> f,
2100 lr::AioCompletion* c)
2101 {
2102 std::unique_lock l(m);
2103 auto tid = ++next_tid;
2104 l.unlock();
2105 auto ig = std::make_unique<InfoGetter>(dpp, this, std::move(f), tid, c);
2106 read_meta(dpp, tid, InfoGetter::call(std::move(ig)));
2107 }
2108
2109 struct JournalProcessor : public Completion<JournalProcessor> {
2110 private:
2111 FIFO* const fifo;
2112
2113 std::vector<fifo::journal_entry> processed;
2114 decltype(fifo->info.journal) journal;
2115 decltype(journal)::iterator iter;
2116 std::int64_t new_tail;
2117 std::int64_t new_head;
2118 std::int64_t new_max;
2119 int race_retries = 0;
2120 bool first_pp = true;
2121 bool canceled = false;
2122 std::uint64_t tid;
2123
2124 enum {
2125 entry_callback,
2126 pp_callback,
2127 } state;
2128
2129 void create_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num) {
2130 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2131 << " entering: tid=" << tid << dendl;
2132 state = entry_callback;
2133 lr::ObjectWriteOperation op;
2134 op.create(false); /* We don't need exclusivity, part_init ensures
2135 we're creating from the same journal entry. */
2136 std::unique_lock l(fifo->m);
2137 part_init(&op, fifo->info.params);
2138 auto oid = fifo->info.part_oid(part_num);
2139 l.unlock();
2140 auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op);
2141 ceph_assert(r >= 0);
2142 return;
2143 }
2144
2145 void remove_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num) {
2146 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2147 << " entering: tid=" << tid << dendl;
2148 state = entry_callback;
2149 lr::ObjectWriteOperation op;
2150 op.remove();
2151 std::unique_lock l(fifo->m);
2152 auto oid = fifo->info.part_oid(part_num);
2153 l.unlock();
2154 auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op);
2155 ceph_assert(r >= 0);
2156 return;
2157 }
2158
2159 void finish_je(const DoutPrefixProvider *dpp, Ptr&& p, int r,
2160 const fifo::journal_entry& entry) {
2161 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2162 << " entering: tid=" << tid << dendl;
2163
2164 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2165 << " finishing entry: entry=" << entry
2166 << " tid=" << tid << dendl;
2167
2168 using enum fifo::journal_entry::Op;
2169 if (entry.op == remove && r == -ENOENT)
2170 r = 0;
2171
2172 if (r < 0) {
2173 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
2174 << " processing entry failed: entry=" << entry
2175 << " r=" << r << " tid=" << tid << dendl;
2176 complete(std::move(p), r);
2177 return;
2178 } else {
2179 switch (entry.op) {
2180 case unknown:
2181 case set_head:
2182 // Can't happen. Filtered out in process.
2183 complete(std::move(p), -EIO);
2184 return;
2185
2186 case create:
2187 if (entry.part_num > new_max) {
2188 new_max = entry.part_num;
2189 }
2190 break;
2191 case remove:
2192 if (entry.part_num >= new_tail) {
2193 new_tail = entry.part_num + 1;
2194 }
2195 break;
2196 }
2197 processed.push_back(entry);
2198 }
2199 ++iter;
2200 process(dpp, std::move(p));
2201 }
2202
2203 void postprocess(const DoutPrefixProvider *dpp, Ptr&& p) {
2204 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2205 << " entering: tid=" << tid << dendl;
2206 if (processed.empty()) {
2207 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2208 << " nothing to update any more: race_retries="
2209 << race_retries << " tid=" << tid << dendl;
2210 complete(std::move(p), 0);
2211 return;
2212 }
2213 pp_run(dpp, std::move(p), 0, false);
2214 }
2215
2216 public:
2217
2218 JournalProcessor(const DoutPrefixProvider *dpp, FIFO* fifo, std::uint64_t tid, lr::AioCompletion* super)
2219 : Completion(dpp, super), fifo(fifo), tid(tid) {
2220 std::unique_lock l(fifo->m);
2221 journal = fifo->info.journal;
2222 iter = journal.begin();
2223 new_tail = fifo->info.tail_part_num;
2224 new_head = fifo->info.head_part_num;
2225 new_max = fifo->info.max_push_part_num;
2226 }
2227
2228 void pp_run(const DoutPrefixProvider *dpp, Ptr&& p, int r, bool canceled) {
2229 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2230 << " entering: tid=" << tid << dendl;
2231 std::optional<int64_t> tail_part_num;
2232 std::optional<int64_t> head_part_num;
2233 std::optional<int64_t> max_part_num;
2234
2235 if (r < 0) {
2236 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
2237 << " failed, r=: " << r << " tid=" << tid << dendl;
2238 complete(std::move(p), r);
2239 }
2240
2241
2242 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2243 << " postprocessing: race_retries="
2244 << race_retries << " tid=" << tid << dendl;
2245
2246 if (!first_pp && r == 0 && !canceled) {
2247 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2248 << " nothing to update any more: race_retries="
2249 << race_retries << " tid=" << tid << dendl;
2250 complete(std::move(p), 0);
2251 return;
2252 }
2253
2254 first_pp = false;
2255
2256 if (canceled) {
2257 if (race_retries >= MAX_RACE_RETRIES) {
2258 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
2259 << " canceled too many times, giving up: tid="
2260 << tid << dendl;
2261 complete(std::move(p), -ECANCELED);
2262 return;
2263 }
2264 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2265 << " update canceled, retrying: race_retries="
2266 << race_retries << " tid=" << tid << dendl;
2267
2268 ++race_retries;
2269
2270 std::vector<fifo::journal_entry> new_processed;
2271 std::unique_lock l(fifo->m);
2272 for (auto& e : processed) {
2273 if (fifo->info.journal.contains(e)) {
2274 new_processed.push_back(e);
2275 }
2276 }
2277 processed = std::move(new_processed);
2278 }
2279
2280 std::unique_lock l(fifo->m);
2281 auto objv = fifo->info.version;
2282 if (new_tail > fifo->info.tail_part_num) {
2283 tail_part_num = new_tail;
2284 }
2285
2286 if (new_head > fifo->info.head_part_num) {
2287 head_part_num = new_head;
2288 }
2289
2290 if (new_max > fifo->info.max_push_part_num) {
2291 max_part_num = new_max;
2292 }
2293 l.unlock();
2294
2295 if (processed.empty() &&
2296 !tail_part_num &&
2297 !max_part_num) {
2298 /* nothing to update anymore */
2299 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2300 << " nothing to update any more: race_retries="
2301 << race_retries << " tid=" << tid << dendl;
2302 complete(std::move(p), 0);
2303 return;
2304 }
2305 state = pp_callback;
2306 fifo->_update_meta(dpp, fifo::update{}
2307 .tail_part_num(tail_part_num)
2308 .head_part_num(head_part_num)
2309 .max_push_part_num(max_part_num)
2310 .journal_entries_rm(processed),
2311 objv, &this->canceled, tid, call(std::move(p)));
2312 return;
2313 }
2314
2315 JournalProcessor(const JournalProcessor&) = delete;
2316 JournalProcessor& operator =(const JournalProcessor&) = delete;
2317 JournalProcessor(JournalProcessor&&) = delete;
2318 JournalProcessor& operator =(JournalProcessor&&) = delete;
2319
2320 void process(const DoutPrefixProvider *dpp, Ptr&& p) {
2321 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2322 << " entering: tid=" << tid << dendl;
2323 while (iter != journal.end()) {
2324 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2325 << " processing entry: entry=" << *iter
2326 << " tid=" << tid << dendl;
2327 const auto entry = *iter;
2328 switch (entry.op) {
2329 using enum fifo::journal_entry::Op;
2330 case create:
2331 create_part(dpp, std::move(p), entry.part_num);
2332 return;
2333 case set_head:
2334 if (entry.part_num > new_head) {
2335 new_head = entry.part_num;
2336 }
2337 processed.push_back(entry);
2338 ++iter;
2339 continue;
2340 case remove:
2341 remove_part(dpp, std::move(p), entry.part_num);
2342 return;
2343 default:
2344 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
2345 << " unknown journaled op: entry=" << entry << " tid="
2346 << tid << dendl;
2347 complete(std::move(p), -EIO);
2348 return;
2349 }
2350 }
2351 postprocess(dpp, std::move(p));
2352 return;
2353 }
2354
2355 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
2356 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2357 << " entering: tid=" << tid << dendl;
2358 switch (state) {
2359 case entry_callback:
2360 finish_je(dpp, std::move(p), r, *iter);
2361 return;
2362 case pp_callback:
2363 auto c = canceled;
2364 canceled = false;
2365 pp_run(dpp, std::move(p), r, c);
2366 return;
2367 }
2368
2369 abort();
2370 }
2371
2372 };
2373
2374 void FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c) {
2375 auto p = std::make_unique<JournalProcessor>(dpp, this, tid, c);
2376 p->process(dpp, std::move(p));
2377 }
2378
2379 struct Lister : Completion<Lister> {
2380 FIFO* f;
2381 std::vector<list_entry> result;
2382 bool more = false;
2383 std::int64_t part_num;
2384 std::uint64_t ofs;
2385 int max_entries;
2386 int r_out = 0;
2387 std::vector<fifo::part_list_entry> entries;
2388 bool part_more = false;
2389 bool part_full = false;
2390 std::vector<list_entry>* entries_out;
2391 bool* more_out;
2392 std::uint64_t tid;
2393
2394 bool read = false;
2395
2396 void complete(Ptr&& p, int r) {
2397 if (r >= 0) {
2398 if (more_out) *more_out = more;
2399 if (entries_out) *entries_out = std::move(result);
2400 }
2401 Completion::complete(std::move(p), r);
2402 }
2403
2404 public:
2405 Lister(const DoutPrefixProvider *dpp, FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries,
2406 std::vector<list_entry>* entries_out, bool* more_out,
2407 std::uint64_t tid, lr::AioCompletion* super)
2408 : Completion(dpp, super), f(f), part_num(part_num), ofs(ofs), max_entries(max_entries),
2409 entries_out(entries_out), more_out(more_out), tid(tid) {
2410 result.reserve(max_entries);
2411 }
2412
2413 Lister(const Lister&) = delete;
2414 Lister& operator =(const Lister&) = delete;
2415 Lister(Lister&&) = delete;
2416 Lister& operator =(Lister&&) = delete;
2417
2418 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
2419 if (read)
2420 handle_read(std::move(p), r);
2421 else
2422 handle_list(dpp, std::move(p), r);
2423 }
2424
2425 void list(Ptr&& p) {
2426 if (max_entries > 0) {
2427 part_more = false;
2428 part_full = false;
2429 entries.clear();
2430
2431 std::unique_lock l(f->m);
2432 auto part_oid = f->info.part_oid(part_num);
2433 l.unlock();
2434
2435 read = false;
2436 auto op = list_part(f->cct, ofs, max_entries, &r_out,
2437 &entries, &part_more, &part_full, tid);
2438 f->ioctx.aio_operate(part_oid, call(std::move(p)), &op, nullptr);
2439 } else {
2440 complete(std::move(p), 0);
2441 }
2442 }
2443
2444 void handle_read(Ptr&& p, int r) {
2445 read = false;
2446 if (r >= 0) r = r_out;
2447 r_out = 0;
2448
2449 if (r < 0) {
2450 complete(std::move(p), r);
2451 return;
2452 }
2453
2454 if (part_num < f->info.tail_part_num) {
2455 /* raced with trim? restart */
2456 max_entries += result.size();
2457 result.clear();
2458 part_num = f->info.tail_part_num;
2459 ofs = 0;
2460 list(std::move(p));
2461 return;
2462 }
2463 /* assuming part was not written yet, so end of data */
2464 more = false;
2465 complete(std::move(p), 0);
2466 return;
2467 }
2468
2469 void handle_list(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
2470 if (r >= 0) r = r_out;
2471 r_out = 0;
2472 std::unique_lock l(f->m);
2473 auto part_oid = f->info.part_oid(part_num);
2474 l.unlock();
2475 if (r == -ENOENT) {
2476 read = true;
2477 f->read_meta(dpp, tid, call(std::move(p)));
2478 return;
2479 }
2480 if (r < 0) {
2481 complete(std::move(p), r);
2482 return;
2483 }
2484
2485 more = part_full || part_more;
2486 for (auto& entry : entries) {
2487 list_entry e;
2488 e.data = std::move(entry.data);
2489 e.marker = marker{part_num, entry.ofs}.to_string();
2490 e.mtime = entry.mtime;
2491 result.push_back(std::move(e));
2492 }
2493 max_entries -= entries.size();
2494 entries.clear();
2495 if (max_entries > 0 && part_more) {
2496 list(std::move(p));
2497 return;
2498 }
2499
2500 if (!part_full) { /* head part is not full */
2501 complete(std::move(p), 0);
2502 return;
2503 }
2504 ++part_num;
2505 ofs = 0;
2506 list(std::move(p));
2507 }
2508 };
2509
2510 void FIFO::list(const DoutPrefixProvider *dpp, int max_entries,
2511 std::optional<std::string_view> markstr,
2512 std::vector<list_entry>* out,
2513 bool* more,
2514 lr::AioCompletion* c) {
2515 std::unique_lock l(m);
2516 auto tid = ++next_tid;
2517 std::int64_t part_num = info.tail_part_num;
2518 l.unlock();
2519 std::uint64_t ofs = 0;
2520 std::optional<::rgw::cls::fifo::marker> marker;
2521
2522 if (markstr) {
2523 marker = to_marker(*markstr);
2524 if (marker) {
2525 part_num = marker->num;
2526 ofs = marker->ofs;
2527 }
2528 }
2529
2530 auto ls = std::make_unique<Lister>(dpp, this, part_num, ofs, max_entries, out,
2531 more, tid, c);
2532 if (markstr && !marker) {
2533 auto l = ls.get();
2534 l->complete(std::move(ls), -EINVAL);
2535 } else {
2536 ls->list(std::move(ls));
2537 }
2538 }
2539 }