1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2020 Red Hat <contact@redhat.com>
7 * Author: Adam C. Emerson
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
20 #include <string_view>
22 #include <fmt/format.h>
24 #include "include/rados/librados.hpp"
26 #include "include/buffer.h"
28 #include "common/async/yield_context.h"
29 #include "common/random_string.h"
31 #include "cls/fifo/cls_fifo_types.h"
32 #include "cls/fifo/cls_fifo_ops.h"
34 #include "cls_fifo_legacy.h"
36 namespace rgw::cls::fifo
{
37 namespace cb
= ceph::buffer
;
38 namespace fifo
= rados::cls::fifo
;
40 using ceph::from_error_code
;
42 inline constexpr auto MAX_RACE_RETRIES
= 10;
44 void create_meta(lr::ObjectWriteOperation
* op
,
46 std::optional
<fifo::objv
> objv
,
47 std::optional
<std::string_view
> oid_prefix
,
49 std::uint64_t max_part_size
,
50 std::uint64_t max_entry_size
)
52 fifo::op::create_meta cm
;
56 cm
.oid_prefix
= oid_prefix
;
57 cm
.max_part_size
= max_part_size
;
58 cm
.max_entry_size
= max_entry_size
;
59 cm
.exclusive
= exclusive
;
63 op
->exec(fifo::op::CLASS
, fifo::op::CREATE_META
, in
);
66 int get_meta(const DoutPrefixProvider
*dpp
, lr::IoCtx
& ioctx
, const std::string
& oid
,
67 std::optional
<fifo::objv
> objv
, fifo::info
* info
,
68 std::uint32_t* part_header_size
,
69 std::uint32_t* part_entry_overhead
,
70 uint64_t tid
, optional_yield y
,
73 lr::ObjectReadOperation op
;
74 fifo::op::get_meta gm
;
80 op
.exec(fifo::op::CLASS
, fifo::op::GET_META
, in
,
82 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, nullptr, y
);
84 fifo::op::get_meta_reply reply
;
85 auto iter
= bl
.cbegin();
87 if (info
) *info
= std::move(reply
.info
);
88 if (part_header_size
) *part_header_size
= reply
.part_header_size
;
89 if (part_entry_overhead
)
90 *part_entry_overhead
= reply
.part_entry_overhead
;
91 } catch (const cb::error
& err
) {
93 << __PRETTY_FUNCTION__
<< ":" << __LINE__
94 << " decode failed: " << err
.what()
95 << " tid=" << tid
<< dendl
;
96 r
= from_error_code(err
.code());
97 } else if (!(probe
&& (r
== -ENOENT
|| r
== -ENODATA
))) {
99 << __PRETTY_FUNCTION__
<< ":" << __LINE__
100 << " fifo::op::GET_META failed r=" << r
<< " tid=" << tid
107 void update_meta(lr::ObjectWriteOperation
* op
, const fifo::objv
& objv
,
108 const fifo::update
& update
)
110 fifo::op::update_meta um
;
113 um
.tail_part_num
= update
.tail_part_num();
114 um
.head_part_num
= update
.head_part_num();
115 um
.min_push_part_num
= update
.min_push_part_num();
116 um
.max_push_part_num
= update
.max_push_part_num();
117 um
.journal_entries_add
= std::move(update
).journal_entries_add();
118 um
.journal_entries_rm
= std::move(update
).journal_entries_rm();
122 op
->exec(fifo::op::CLASS
, fifo::op::UPDATE_META
, in
);
125 void part_init(lr::ObjectWriteOperation
* op
, fifo::data_params params
)
127 fifo::op::init_part ip
;
133 op
->exec(fifo::op::CLASS
, fifo::op::INIT_PART
, in
);
136 int push_part(const DoutPrefixProvider
*dpp
, lr::IoCtx
& ioctx
, const std::string
& oid
,
137 std::deque
<cb::list
> data_bufs
, std::uint64_t tid
,
140 lr::ObjectWriteOperation op
;
141 fifo::op::push_part pp
;
145 pp
.data_bufs
= data_bufs
;
148 for (const auto& bl
: data_bufs
)
149 pp
.total_len
+= bl
.length();
154 op
.exec(fifo::op::CLASS
, fifo::op::PUSH_PART
, in
, nullptr, &retval
);
155 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, y
, lr::OPERATION_RETURNVEC
);
158 << __PRETTY_FUNCTION__
<< ":" << __LINE__
159 << " fifo::op::PUSH_PART failed r=" << r
160 << " tid=" << tid
<< dendl
;
165 << __PRETTY_FUNCTION__
<< ":" << __LINE__
166 << " error handling response retval=" << retval
167 << " tid=" << tid
<< dendl
;
172 void push_part(lr::IoCtx
& ioctx
, const std::string
& oid
,
173 std::deque
<cb::list
> data_bufs
, std::uint64_t tid
,
174 lr::AioCompletion
* c
)
176 lr::ObjectWriteOperation op
;
177 fifo::op::push_part pp
;
179 pp
.data_bufs
= data_bufs
;
182 for (const auto& bl
: data_bufs
)
183 pp
.total_len
+= bl
.length();
187 op
.exec(fifo::op::CLASS
, fifo::op::PUSH_PART
, in
);
188 auto r
= ioctx
.aio_operate(oid
, c
, &op
, lr::OPERATION_RETURNVEC
);
192 void trim_part(lr::ObjectWriteOperation
* op
,
193 std::uint64_t ofs
, bool exclusive
)
195 fifo::op::trim_part tp
;
198 tp
.exclusive
= exclusive
;
202 op
->exec(fifo::op::CLASS
, fifo::op::TRIM_PART
, in
);
205 int list_part(const DoutPrefixProvider
*dpp
, lr::IoCtx
& ioctx
, const std::string
& oid
,
206 std::uint64_t ofs
, std::uint64_t max_entries
,
207 std::vector
<fifo::part_list_entry
>* entries
,
208 bool* more
, bool* full_part
,
209 std::uint64_t tid
, optional_yield y
)
211 lr::ObjectReadOperation op
;
212 fifo::op::list_part lp
;
215 lp
.max_entries
= max_entries
;
220 op
.exec(fifo::op::CLASS
, fifo::op::LIST_PART
, in
, &bl
, nullptr);
221 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, nullptr, y
);
223 fifo::op::list_part_reply reply
;
224 auto iter
= bl
.cbegin();
226 if (entries
) *entries
= std::move(reply
.entries
);
227 if (more
) *more
= reply
.more
;
228 if (full_part
) *full_part
= reply
.full_part
;
229 } catch (const cb::error
& err
) {
231 << __PRETTY_FUNCTION__
<< ":" << __LINE__
232 << " decode failed: " << err
.what()
233 << " tid=" << tid
<< dendl
;
234 r
= from_error_code(err
.code());
235 } else if (r
!= -ENOENT
) {
237 << __PRETTY_FUNCTION__
<< ":" << __LINE__
238 << " fifo::op::LIST_PART failed r=" << r
<< " tid=" << tid
244 struct list_entry_completion
: public lr::ObjectOperationCompletion
{
247 std::vector
<fifo::part_list_entry
>* entries
;
252 list_entry_completion(CephContext
* cct
, int* r_out
, std::vector
<fifo::part_list_entry
>* entries
,
253 bool* more
, bool* full_part
, std::uint64_t tid
)
254 : cct(cct
), r_out(r_out
), entries(entries
), more(more
),
255 full_part(full_part
), tid(tid
) {}
256 virtual ~list_entry_completion() = default;
257 void handle_completion(int r
, bufferlist
& bl
) override
{
259 fifo::op::list_part_reply reply
;
260 auto iter
= bl
.cbegin();
262 if (entries
) *entries
= std::move(reply
.entries
);
263 if (more
) *more
= reply
.more
;
264 if (full_part
) *full_part
= reply
.full_part
;
265 } catch (const cb::error
& err
) {
267 << __PRETTY_FUNCTION__
<< ":" << __LINE__
268 << " decode failed: " << err
.what()
269 << " tid=" << tid
<< dendl
;
270 r
= from_error_code(err
.code());
273 << __PRETTY_FUNCTION__
<< ":" << __LINE__
274 << " fifo::op::LIST_PART failed r=" << r
<< " tid=" << tid
277 if (r_out
) *r_out
= r
;
281 lr::ObjectReadOperation
list_part(CephContext
* cct
,
283 std::uint64_t max_entries
,
285 std::vector
<fifo::part_list_entry
>* entries
,
286 bool* more
, bool* full_part
,
289 lr::ObjectReadOperation op
;
290 fifo::op::list_part lp
;
293 lp
.max_entries
= max_entries
;
297 op
.exec(fifo::op::CLASS
, fifo::op::LIST_PART
, in
,
298 new list_entry_completion(cct
, r_out
, entries
, more
, full_part
,
303 int get_part_info(const DoutPrefixProvider
*dpp
, lr::IoCtx
& ioctx
, const std::string
& oid
,
304 fifo::part_header
* header
,
305 std::uint64_t tid
, optional_yield y
)
307 lr::ObjectReadOperation op
;
308 fifo::op::get_part_info gpi
;
313 op
.exec(fifo::op::CLASS
, fifo::op::GET_PART_INFO
, in
, &bl
, nullptr);
314 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, nullptr, y
);
316 fifo::op::get_part_info_reply reply
;
317 auto iter
= bl
.cbegin();
319 if (header
) *header
= std::move(reply
.header
);
320 } catch (const cb::error
& err
) {
322 << __PRETTY_FUNCTION__
<< ":" << __LINE__
323 << " decode failed: " << err
.what()
324 << " tid=" << tid
<< dendl
;
325 r
= from_error_code(err
.code());
328 << __PRETTY_FUNCTION__
<< ":" << __LINE__
329 << " fifo::op::GET_PART_INFO failed r=" << r
<< " tid=" << tid
335 struct partinfo_completion
: public lr::ObjectOperationCompletion
{
338 fifo::part_header
* h
;
340 partinfo_completion(CephContext
* cct
, int* rp
, fifo::part_header
* h
,
342 cct(cct
), rp(rp
), h(h
), tid(tid
) {
344 virtual ~partinfo_completion() = default;
345 void handle_completion(int r
, bufferlist
& bl
) override
{
347 fifo::op::get_part_info_reply reply
;
348 auto iter
= bl
.cbegin();
350 if (h
) *h
= std::move(reply
.header
);
351 } catch (const cb::error
& err
) {
352 r
= from_error_code(err
.code());
353 lderr(cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
354 << " decode failed: " << err
.what()
355 << " tid=" << tid
<< dendl
;
357 lderr(cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
358 << " fifo::op::GET_PART_INFO failed r=" << r
<< " tid=" << tid
367 lr::ObjectReadOperation
get_part_info(CephContext
* cct
,
368 fifo::part_header
* header
,
369 std::uint64_t tid
, int* r
= 0)
371 lr::ObjectReadOperation op
;
372 fifo::op::get_part_info gpi
;
377 op
.exec(fifo::op::CLASS
, fifo::op::GET_PART_INFO
, in
,
378 new partinfo_completion(cct
, r
, header
, tid
));
383 std::optional
<marker
> FIFO::to_marker(std::string_view s
)
387 m
.num
= info
.tail_part_num
;
392 auto pos
= s
.find(':');
397 auto num
= s
.substr(0, pos
);
398 auto ofs
= s
.substr(pos
+ 1);
400 auto n
= ceph::parse
<decltype(m
.num
)>(num
);
405 auto o
= ceph::parse
<decltype(m
.ofs
)>(ofs
);
413 int FIFO::apply_update(const DoutPrefixProvider
*dpp
,
415 const fifo::objv
& objv
,
416 const fifo::update
& update
,
419 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
420 << " entering: tid=" << tid
<< dendl
;
421 std::unique_lock
l(m
);
422 if (objv
!= info
->version
) {
423 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
424 << " version mismatch, canceling: tid=" << tid
<< dendl
;
428 info
->apply_update(update
);
432 int FIFO::_update_meta(const DoutPrefixProvider
*dpp
, const fifo::update
& update
,
433 fifo::objv version
, bool* pcanceled
,
434 std::uint64_t tid
, optional_yield y
)
436 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
437 << " entering: tid=" << tid
<< dendl
;
438 lr::ObjectWriteOperation op
;
439 bool canceled
= false;
440 update_meta(&op
, version
, update
);
441 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, y
);
442 if (r
>= 0 || r
== -ECANCELED
) {
443 canceled
= (r
== -ECANCELED
);
445 r
= apply_update(dpp
, &info
, version
, update
, tid
);
446 if (r
< 0) canceled
= true;
449 r
= read_meta(dpp
, tid
, y
);
450 canceled
= r
< 0 ? false : true;
453 if (pcanceled
) *pcanceled
= canceled
;
455 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
456 << " canceled: tid=" << tid
<< dendl
;
459 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
460 << " returning error: r=" << r
<< " tid=" << tid
<< dendl
;
465 struct Updater
: public Completion
<Updater
> {
470 bool* pcanceled
= nullptr;
472 Updater(const DoutPrefixProvider
*dpp
, FIFO
* fifo
, lr::AioCompletion
* super
,
473 const fifo::update
& update
, fifo::objv version
,
474 bool* pcanceled
, std::uint64_t tid
)
475 : Completion(dpp
, super
), fifo(fifo
), update(update
), version(version
),
476 pcanceled(pcanceled
) {}
478 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
479 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
480 << " entering: tid=" << tid
<< dendl
;
482 handle_reread(dpp
, std::move(p
), r
);
484 handle_update(dpp
, std::move(p
), r
);
487 void handle_update(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
488 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
489 << " handling async update_meta: tid="
491 if (r
< 0 && r
!= -ECANCELED
) {
492 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
493 << " update failed: r=" << r
<< " tid=" << tid
<< dendl
;
494 complete(std::move(p
), r
);
497 bool canceled
= (r
== -ECANCELED
);
499 int r
= fifo
->apply_update(dpp
, &fifo
->info
, version
, update
, tid
);
501 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
502 << " update failed, marking canceled: r=" << r
503 << " tid=" << tid
<< dendl
;
509 fifo
->read_meta(dpp
, tid
, call(std::move(p
)));
514 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
515 << " completing: tid=" << tid
<< dendl
;
516 complete(std::move(p
), 0);
519 void handle_reread(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
520 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
521 << " handling async read_meta: tid="
523 if (r
< 0 && pcanceled
) {
525 } else if (r
>= 0 && pcanceled
) {
529 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
530 << " failed dispatching read_meta: r=" << r
<< " tid="
533 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
534 << " completing: tid=" << tid
<< dendl
;
536 complete(std::move(p
), r
);
540 void FIFO::_update_meta(const DoutPrefixProvider
*dpp
, const fifo::update
& update
,
541 fifo::objv version
, bool* pcanceled
,
542 std::uint64_t tid
, lr::AioCompletion
* c
)
544 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
545 << " entering: tid=" << tid
<< dendl
;
546 lr::ObjectWriteOperation op
;
547 update_meta(&op
, info
.version
, update
);
548 auto updater
= std::make_unique
<Updater
>(dpp
, this, c
, update
, version
, pcanceled
,
550 auto r
= ioctx
.aio_operate(oid
, Updater::call(std::move(updater
)), &op
);
554 int FIFO::create_part(const DoutPrefixProvider
*dpp
, int64_t part_num
, std::uint64_t tid
,
557 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
558 << " entering: tid=" << tid
<< dendl
;
559 lr::ObjectWriteOperation op
;
560 op
.create(false); /* We don't need exclusivity, part_init ensures
561 we're creating from the same journal entry. */
562 std::unique_lock
l(m
);
563 part_init(&op
, info
.params
);
564 auto oid
= info
.part_oid(part_num
);
566 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, y
);
568 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
569 << " part_init failed: r=" << r
<< " tid="
575 int FIFO::remove_part(const DoutPrefixProvider
*dpp
, int64_t part_num
, std::uint64_t tid
,
578 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
579 << " entering: tid=" << tid
<< dendl
;
580 lr::ObjectWriteOperation op
;
582 std::unique_lock
l(m
);
583 auto oid
= info
.part_oid(part_num
);
585 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, y
);
587 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
588 << " remove failed: r=" << r
<< " tid="
594 int FIFO::process_journal(const DoutPrefixProvider
*dpp
, std::uint64_t tid
, optional_yield y
)
596 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
597 << " entering: tid=" << tid
<< dendl
;
598 std::vector
<fifo::journal_entry
> processed
;
600 std::unique_lock
l(m
);
601 auto tmpjournal
= info
.journal
;
602 auto new_tail
= info
.tail_part_num
;
603 auto new_head
= info
.head_part_num
;
604 auto new_max
= info
.max_push_part_num
;
608 for (auto& entry
: tmpjournal
) {
609 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
610 << " processing entry: entry=" << entry
<< " tid=" << tid
613 using enum fifo::journal_entry::Op
;
615 r
= create_part(dpp
, entry
.part_num
, tid
, y
);
616 if (entry
.part_num
> new_max
) {
617 new_max
= entry
.part_num
;
622 if (entry
.part_num
> new_head
) {
623 new_head
= entry
.part_num
;
627 r
= remove_part(dpp
, entry
.part_num
, tid
, y
);
628 if (r
== -ENOENT
) r
= 0;
629 if (entry
.part_num
>= new_tail
) {
630 new_tail
= entry
.part_num
+ 1;
634 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
635 << " unknown journaled op: entry=" << entry
<< " tid="
641 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
642 << " processing entry failed: entry=" << entry
643 << " r=" << r
<< " tid=" << tid
<< dendl
;
647 processed
.push_back(std::move(entry
));
651 bool canceled
= true;
653 for (auto i
= 0; canceled
&& i
< MAX_RACE_RETRIES
; ++i
) {
654 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
655 << " postprocessing: i=" << i
<< " tid=" << tid
<< dendl
;
657 std::optional
<int64_t> tail_part_num
;
658 std::optional
<int64_t> head_part_num
;
659 std::optional
<int64_t> max_part_num
;
661 std::unique_lock
l(m
);
662 auto objv
= info
.version
;
663 if (new_tail
> tail_part_num
) tail_part_num
= new_tail
;
664 if (new_head
> info
.head_part_num
) head_part_num
= new_head
;
665 if (new_max
> info
.max_push_part_num
) max_part_num
= new_max
;
668 if (processed
.empty() &&
671 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
672 << " nothing to update any more: i=" << i
<< " tid="
677 auto u
= fifo::update().tail_part_num(tail_part_num
)
678 .head_part_num(head_part_num
).max_push_part_num(max_part_num
)
679 .journal_entries_rm(processed
);
680 r
= _update_meta(dpp
, u
, objv
, &canceled
, tid
, y
);
682 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
683 << " _update_meta failed: update=" << u
684 << " r=" << r
<< " tid=" << tid
<< dendl
;
689 std::vector
<fifo::journal_entry
> new_processed
;
690 std::unique_lock
l(m
);
691 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
692 << " update canceled, retrying: i=" << i
<< " tid="
694 for (auto& e
: processed
) {
695 if (info
.journal
.contains(e
)) {
696 new_processed
.push_back(e
);
699 processed
= std::move(new_processed
);
702 if (r
== 0 && canceled
) {
703 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
704 << " canceled too many times, giving up: tid=" << tid
<< dendl
;
708 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
709 << " failed, r=: " << r
<< " tid=" << tid
<< dendl
;
714 int FIFO::_prepare_new_part(const DoutPrefixProvider
*dpp
,
715 std::int64_t new_part_num
, bool is_head
,
716 std::uint64_t tid
, optional_yield y
)
718 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
719 << " entering: tid=" << tid
<< dendl
;
720 std::unique_lock
l(m
);
721 using enum fifo::journal_entry::Op
;
722 std::vector
<fifo::journal_entry
> jentries
{{ create
, new_part_num
}};
723 if (info
.journal
.contains({create
, new_part_num
}) &&
724 (!is_head
|| info
.journal
.contains({set_head
, new_part_num
}))) {
726 ldpp_dout(dpp
, 5) << __PRETTY_FUNCTION__
<< ":" << __LINE__
727 << " new part journaled, but not processed: tid="
729 auto r
= process_journal(dpp
, tid
, y
);
731 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
732 << " process_journal failed: r=" << r
<< " tid=" << tid
<< dendl
;
736 auto version
= info
.version
;
739 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
740 << " needs new head: tid=" << tid
<< dendl
;
741 jentries
.push_back({ set_head
, new_part_num
});
746 bool canceled
= true;
747 for (auto i
= 0; canceled
&& i
< MAX_RACE_RETRIES
; ++i
) {
749 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
750 << " updating metadata: i=" << i
<< " tid=" << tid
<< dendl
;
751 auto u
= fifo::update
{}.journal_entries_add(jentries
);
752 r
= _update_meta(dpp
, u
, version
, &canceled
, tid
, y
);
753 if (r
>= 0 && canceled
) {
754 std::unique_lock
l(m
);
755 version
= info
.version
;
756 auto found
= (info
.journal
.contains({create
, new_part_num
}) ||
757 info
.journal
.contains({set_head
, new_part_num
}));
758 if ((info
.max_push_part_num
>= new_part_num
&&
759 info
.head_part_num
>= new_part_num
)) {
760 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
761 << " raced, but journaled and processed: i=" << i
762 << " tid=" << tid
<< dendl
;
766 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
767 << " raced, journaled but not processed: i=" << i
768 << " tid=" << tid
<< dendl
;
774 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
775 << " _update_meta failed: update=" << u
<< " r=" << r
776 << " tid=" << tid
<< dendl
;
781 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
782 << " canceled too many times, giving up: tid=" << tid
<< dendl
;
785 r
= process_journal(dpp
, tid
, y
);
787 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
788 << " process_journal failed: r=" << r
<< " tid=" << tid
<< dendl
;
793 int FIFO::_prepare_new_head(const DoutPrefixProvider
*dpp
,
794 std::int64_t new_head_part_num
,
795 std::uint64_t tid
, optional_yield y
)
797 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
798 << " entering: tid=" << tid
<< dendl
;
799 std::unique_lock
l(m
);
800 auto max_push_part_num
= info
.max_push_part_num
;
801 auto version
= info
.version
;
805 if (max_push_part_num
< new_head_part_num
) {
806 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
807 << " need new part: tid=" << tid
<< dendl
;
808 r
= _prepare_new_part(dpp
, new_head_part_num
, true, tid
, y
);
810 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
811 << " _prepare_new_part failed: r=" << r
812 << " tid=" << tid
<< dendl
;
815 std::unique_lock
l(m
);
816 if (info
.max_push_part_num
< new_head_part_num
) {
817 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
818 << " inconsistency, push part less than head part: "
819 << " tid=" << tid
<< dendl
;
826 using enum fifo::journal_entry::Op
;
827 fifo::journal_entry jentry
;
828 jentry
.op
= set_head
;
829 jentry
.part_num
= new_head_part_num
;
832 bool canceled
= true;
833 for (auto i
= 0; canceled
&& i
< MAX_RACE_RETRIES
; ++i
) {
835 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
836 << " updating metadata: i=" << i
<< " tid=" << tid
<< dendl
;
837 auto u
= fifo::update
{}.journal_entries_add({{ jentry
}});
838 r
= _update_meta(dpp
, u
, version
, &canceled
, tid
, y
);
839 if (r
>= 0 && canceled
) {
840 std::unique_lock
l(m
);
841 auto found
= (info
.journal
.contains({create
, new_head_part_num
}) ||
842 info
.journal
.contains({set_head
, new_head_part_num
}));
843 version
= info
.version
;
844 if ((info
.head_part_num
>= new_head_part_num
)) {
845 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
846 << " raced, but journaled and processed: i=" << i
847 << " tid=" << tid
<< dendl
;
851 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
852 << " raced, journaled but not processed: i=" << i
853 << " tid=" << tid
<< dendl
;
859 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
860 << " _update_meta failed: update=" << u
<< " r=" << r
861 << " tid=" << tid
<< dendl
;
866 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
867 << " canceled too many times, giving up: tid=" << tid
<< dendl
;
870 r
= process_journal(dpp
, tid
, y
);
872 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
873 << " process_journal failed: r=" << r
<< " tid=" << tid
<< dendl
;
878 struct NewPartPreparer
: public Completion
<NewPartPreparer
> {
880 std::vector
<fifo::journal_entry
> jentries
;
882 std::int64_t new_part_num
;
883 bool canceled
= false;
886 NewPartPreparer(const DoutPrefixProvider
*dpp
, FIFO
* f
, lr::AioCompletion
* super
,
887 std::vector
<fifo::journal_entry
> jentries
,
888 std::int64_t new_part_num
,
890 : Completion(dpp
, super
), f(f
), jentries(std::move(jentries
)),
891 new_part_num(new_part_num
), tid(tid
) {}
893 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
894 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
895 << " entering: tid=" << tid
<< dendl
;
897 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
898 << " _update_meta failed: r=" << r
899 << " tid=" << tid
<< dendl
;
900 complete(std::move(p
), r
);
905 using enum fifo::journal_entry::Op
;
906 std::unique_lock
l(f
->m
);
907 auto found
= (f
->info
.journal
.contains({create
, new_part_num
}) ||
908 f
->info
.journal
.contains({set_head
, new_part_num
}));
909 auto max_push_part_num
= f
->info
.max_push_part_num
;
910 auto head_part_num
= f
->info
.head_part_num
;
911 auto version
= f
->info
.version
;
913 if ((max_push_part_num
>= new_part_num
&&
914 head_part_num
>= new_part_num
)) {
915 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
916 << " raced, but journaled and processed: i=" << i
917 << " tid=" << tid
<< dendl
;
918 complete(std::move(p
), 0);
921 if (i
>= MAX_RACE_RETRIES
) {
922 complete(std::move(p
), -ECANCELED
);
927 f
->_update_meta(dpp
, fifo::update
{}
928 .journal_entries_add(jentries
),
929 version
, &canceled
, tid
, call(std::move(p
)));
932 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
933 << " raced, journaled but not processed: i=" << i
934 << " tid=" << tid
<< dendl
;
937 // Fall through. We still need to process the journal.
939 f
->process_journal(dpp
, tid
, super());
944 void FIFO::_prepare_new_part(const DoutPrefixProvider
*dpp
, std::int64_t new_part_num
,
945 bool is_head
, std::uint64_t tid
, lr::AioCompletion
* c
)
947 std::unique_lock
l(m
);
948 using enum fifo::journal_entry::Op
;
949 std::vector
<fifo::journal_entry
> jentries
{{create
, new_part_num
}};
950 if (info
.journal
.contains({create
, new_part_num
}) &&
951 (!is_head
|| info
.journal
.contains({set_head
, new_part_num
}))) {
953 ldpp_dout(dpp
, 5) << __PRETTY_FUNCTION__
<< ":" << __LINE__
954 << " new part journaled, but not processed: tid="
956 process_journal(dpp
, tid
, c
);
959 auto version
= info
.version
;
962 jentries
.push_back({ set_head
, new_part_num
});
966 auto n
= std::make_unique
<NewPartPreparer
>(dpp
, this, c
, jentries
,
969 _update_meta(dpp
, fifo::update
{}.journal_entries_add(jentries
), version
,
970 &np
->canceled
, tid
, NewPartPreparer::call(std::move(n
)));
973 struct NewHeadPreparer
: public Completion
<NewHeadPreparer
> {
977 std::int64_t new_head_part_num
;
978 bool canceled
= false;
981 NewHeadPreparer(const DoutPrefixProvider
*dpp
, FIFO
* f
, lr::AioCompletion
* super
,
982 bool newpart
, std::int64_t new_head_part_num
,
984 : Completion(dpp
, super
), f(f
), newpart(newpart
),
985 new_head_part_num(new_head_part_num
), tid(tid
) {}
987 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
989 handle_newpart(std::move(p
), r
);
991 handle_update(dpp
, std::move(p
), r
);
994 void handle_newpart(Ptr
&& p
, int r
) {
996 lderr(f
->cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
997 << " _prepare_new_part failed: r=" << r
998 << " tid=" << tid
<< dendl
;
999 complete(std::move(p
), r
);
1002 std::unique_lock
l(f
->m
);
1003 if (f
->info
.max_push_part_num
< new_head_part_num
) {
1005 lderr(f
->cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1006 << " _prepare_new_part failed: r=" << r
1007 << " tid=" << tid
<< dendl
;
1008 complete(std::move(p
), -EIO
);
1011 complete(std::move(p
), 0);
1015 void handle_update(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
1016 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1017 << " entering: tid=" << tid
<< dendl
;
1019 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1020 << " _update_meta failed: r=" << r
1021 << " tid=" << tid
<< dendl
;
1022 complete(std::move(p
), r
);
1027 using enum fifo::journal_entry::Op
;
1028 std::unique_lock
l(f
->m
);
1029 auto found
= (f
->info
.journal
.contains({create
, new_head_part_num
}) ||
1030 f
->info
.journal
.contains({set_head
, new_head_part_num
}));
1031 auto head_part_num
= f
->info
.head_part_num
;
1032 auto version
= f
->info
.version
;
1035 if ((head_part_num
>= new_head_part_num
)) {
1036 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1037 << " raced, but journaled and processed: i=" << i
1038 << " tid=" << tid
<< dendl
;
1039 complete(std::move(p
), 0);
1042 if (i
>= MAX_RACE_RETRIES
) {
1043 complete(std::move(p
), -ECANCELED
);
1048 fifo::journal_entry jentry
;
1049 jentry
.op
= set_head
;
1050 jentry
.part_num
= new_head_part_num
;
1051 f
->_update_meta(dpp
, fifo::update
{}
1052 .journal_entries_add({{jentry
}}),
1053 version
, &canceled
, tid
, call(std::move(p
)));
1056 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1057 << " raced, journaled but not processed: i=" << i
1058 << " tid=" << tid
<< dendl
;
1061 // Fall through. We still need to process the journal.
1063 f
->process_journal(dpp
, tid
, super());
1068 void FIFO::_prepare_new_head(const DoutPrefixProvider
*dpp
, std::int64_t new_head_part_num
,
1069 std::uint64_t tid
, lr::AioCompletion
* c
)
1071 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1072 << " entering: tid=" << tid
<< dendl
;
1073 std::unique_lock
l(m
);
1074 auto max_push_part_num
= info
.max_push_part_num
;
1075 auto version
= info
.version
;
1078 if (max_push_part_num
< new_head_part_num
) {
1079 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1080 << " need new part: tid=" << tid
<< dendl
;
1081 auto n
= std::make_unique
<NewHeadPreparer
>(dpp
, this, c
, true, new_head_part_num
,
1083 _prepare_new_part(dpp
, new_head_part_num
, true, tid
,
1084 NewHeadPreparer::call(std::move(n
)));
1086 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1087 << " updating head: tid=" << tid
<< dendl
;
1088 auto n
= std::make_unique
<NewHeadPreparer
>(dpp
, this, c
, false, new_head_part_num
,
1091 using enum fifo::journal_entry::Op
;
1092 fifo::journal_entry jentry
;
1093 jentry
.op
= set_head
;
1094 jentry
.part_num
= new_head_part_num
;
1095 _update_meta(dpp
, fifo::update
{}.journal_entries_add({{jentry
}}), version
,
1096 &np
->canceled
, tid
, NewHeadPreparer::call(std::move(n
)));
1100 int FIFO::push_entries(const DoutPrefixProvider
*dpp
, const std::deque
<cb::list
>& data_bufs
,
1101 std::uint64_t tid
, optional_yield y
)
1103 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1104 << " entering: tid=" << tid
<< dendl
;
1105 std::unique_lock
l(m
);
1106 auto head_part_num
= info
.head_part_num
;
1107 const auto part_oid
= info
.part_oid(head_part_num
);
1110 auto r
= push_part(dpp
, ioctx
, part_oid
, data_bufs
, tid
, y
);
1112 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1113 << " push_part failed: r=" << r
<< " tid=" << tid
<< dendl
;
1118 void FIFO::push_entries(const std::deque
<cb::list
>& data_bufs
,
1119 std::uint64_t tid
, lr::AioCompletion
* c
)
1121 std::unique_lock
l(m
);
1122 auto head_part_num
= info
.head_part_num
;
1123 const auto part_oid
= info
.part_oid(head_part_num
);
1126 push_part(ioctx
, part_oid
, data_bufs
, tid
, c
);
1129 int FIFO::trim_part(const DoutPrefixProvider
*dpp
, int64_t part_num
, uint64_t ofs
,
1130 bool exclusive
, std::uint64_t tid
,
1133 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1134 << " entering: tid=" << tid
<< dendl
;
1135 lr::ObjectWriteOperation op
;
1136 std::unique_lock
l(m
);
1137 const auto part_oid
= info
.part_oid(part_num
);
1139 rgw::cls::fifo::trim_part(&op
, ofs
, exclusive
);
1140 auto r
= rgw_rados_operate(dpp
, ioctx
, part_oid
, &op
, y
);
1142 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1143 << " trim_part failed: r=" << r
<< " tid=" << tid
<< dendl
;
1148 void FIFO::trim_part(const DoutPrefixProvider
*dpp
, int64_t part_num
, uint64_t ofs
,
1149 bool exclusive
, std::uint64_t tid
,
1150 lr::AioCompletion
* c
)
1152 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1153 << " entering: tid=" << tid
<< dendl
;
1154 lr::ObjectWriteOperation op
;
1155 std::unique_lock
l(m
);
1156 const auto part_oid
= info
.part_oid(part_num
);
1158 rgw::cls::fifo::trim_part(&op
, ofs
, exclusive
);
1159 auto r
= ioctx
.aio_operate(part_oid
, c
, &op
);
1160 ceph_assert(r
>= 0);
1163 int FIFO::open(const DoutPrefixProvider
*dpp
, lr::IoCtx ioctx
, std::string oid
, std::unique_ptr
<FIFO
>* fifo
,
1164 optional_yield y
, std::optional
<fifo::objv
> objv
,
1168 << __PRETTY_FUNCTION__
<< ":" << __LINE__
1169 << " entering" << dendl
;
1173 int r
= get_meta(dpp
, ioctx
, std::move(oid
), objv
, &info
, &size
, &over
, 0, y
,
1176 if (!(probe
&& (r
== -ENOENT
|| r
== -ENODATA
))) {
1177 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1178 << " get_meta failed: r=" << r
<< dendl
;
1182 std::unique_ptr
<FIFO
> f(new FIFO(std::move(ioctx
), oid
));
1184 f
->part_header_size
= size
;
1185 f
->part_entry_overhead
= over
;
1186 // If there are journal entries, process them, in case
1187 // someone crashed mid-transaction.
1188 if (!info
.journal
.empty()) {
1190 << __PRETTY_FUNCTION__
<< ":" << __LINE__
1191 << " processing leftover journal" << dendl
;
1192 r
= f
->process_journal(dpp
, 0, y
);
1194 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1195 << " process_journal failed: r=" << r
<< dendl
;
1199 *fifo
= std::move(f
);
1203 int FIFO::create(const DoutPrefixProvider
*dpp
, lr::IoCtx ioctx
, std::string oid
, std::unique_ptr
<FIFO
>* fifo
,
1204 optional_yield y
, std::optional
<fifo::objv
> objv
,
1205 std::optional
<std::string_view
> oid_prefix
,
1206 bool exclusive
, std::uint64_t max_part_size
,
1207 std::uint64_t max_entry_size
)
1210 << __PRETTY_FUNCTION__
<< ":" << __LINE__
1211 << " entering" << dendl
;
1212 lr::ObjectWriteOperation op
;
1213 create_meta(&op
, oid
, objv
, oid_prefix
, exclusive
, max_part_size
,
1215 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, y
);
1217 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1218 << " create_meta failed: r=" << r
<< dendl
;
1221 r
= open(dpp
, std::move(ioctx
), std::move(oid
), fifo
, y
, objv
);
1225 int FIFO::read_meta(const DoutPrefixProvider
*dpp
, std::uint64_t tid
, optional_yield y
) {
1226 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1227 << " entering: tid=" << tid
<< dendl
;
1232 auto r
= get_meta(dpp
, ioctx
, oid
, std::nullopt
, &_info
, &_phs
, &_peo
, tid
, y
);
1234 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1235 << " get_meta failed: r=" << r
<< " tid=" << tid
<< dendl
;
1238 std::unique_lock
l(m
);
1239 // We have a newer version already!
1240 if (_info
.version
.same_or_later(this->info
.version
)) {
1241 info
= std::move(_info
);
1242 part_header_size
= _phs
;
1243 part_entry_overhead
= _peo
;
1248 int FIFO::read_meta(const DoutPrefixProvider
*dpp
, optional_yield y
) {
1249 std::unique_lock
l(m
);
1250 auto tid
= ++next_tid
;
1252 return read_meta(dpp
, tid
, y
);
1255 struct Reader
: public Completion
<Reader
> {
1259 Reader(const DoutPrefixProvider
*dpp
, FIFO
* fifo
, lr::AioCompletion
* super
, std::uint64_t tid
)
1260 : Completion(dpp
, super
), fifo(fifo
), tid(tid
) {}
1262 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
1263 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1264 << " entering: tid=" << tid
<< dendl
;
1266 fifo::op::get_meta_reply reply
;
1267 auto iter
= bl
.cbegin();
1268 decode(reply
, iter
);
1269 std::unique_lock
l(fifo
->m
);
1270 if (reply
.info
.version
.same_or_later(fifo
->info
.version
)) {
1271 fifo
->info
= std::move(reply
.info
);
1272 fifo
->part_header_size
= reply
.part_header_size
;
1273 fifo
->part_entry_overhead
= reply
.part_entry_overhead
;
1275 } catch (const cb::error
& err
) {
1276 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1277 << " failed to decode response err=" << err
.what()
1278 << " tid=" << tid
<< dendl
;
1279 r
= from_error_code(err
.code());
1281 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1282 << " read_meta failed r=" << r
1283 << " tid=" << tid
<< dendl
;
1285 complete(std::move(p
), r
);
1289 void FIFO::read_meta(const DoutPrefixProvider
*dpp
, std::uint64_t tid
, lr::AioCompletion
* c
)
1291 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1292 << " entering: tid=" << tid
<< dendl
;
1293 lr::ObjectReadOperation op
;
1294 fifo::op::get_meta gm
;
1297 auto reader
= std::make_unique
<Reader
>(dpp
, this, c
, tid
);
1298 auto rp
= reader
.get();
1299 auto r
= ioctx
.aio_exec(oid
, Reader::call(std::move(reader
)), fifo::op::CLASS
,
1300 fifo::op::GET_META
, in
, &rp
->bl
);
1304 const fifo::info
& FIFO::meta() const {
1308 std::pair
<std::uint32_t, std::uint32_t> FIFO::get_part_layout_info() const {
1309 return {part_header_size
, part_entry_overhead
};
1312 int FIFO::push(const DoutPrefixProvider
*dpp
, const cb::list
& bl
, optional_yield y
) {
1313 return push(dpp
, std::vector
{ bl
}, y
);
1316 void FIFO::push(const DoutPrefixProvider
*dpp
, const cb::list
& bl
, lr::AioCompletion
* c
) {
1317 push(dpp
, std::vector
{ bl
}, c
);
1320 int FIFO::push(const DoutPrefixProvider
*dpp
, const std::vector
<cb::list
>& data_bufs
, optional_yield y
)
1322 std::unique_lock
l(m
);
1323 auto tid
= ++next_tid
;
1324 auto max_entry_size
= info
.params
.max_entry_size
;
1325 auto need_new_head
= info
.need_new_head();
1326 auto head_part_num
= info
.head_part_num
;
1328 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1329 << " entering: tid=" << tid
<< dendl
;
1330 if (data_bufs
.empty()) {
1331 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1332 << " empty push, returning success tid=" << tid
<< dendl
;
1337 for (const auto& bl
: data_bufs
) {
1338 if (bl
.length() > max_entry_size
) {
1339 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1340 << " entry bigger than max_entry_size tid=" << tid
<< dendl
;
1346 if (need_new_head
) {
1347 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1348 << " need new head tid=" << tid
<< dendl
;
1349 r
= _prepare_new_head(dpp
, head_part_num
+ 1, tid
, y
);
1351 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1352 << " _prepare_new_head failed: r=" << r
1353 << " tid=" << tid
<< dendl
;
1358 std::deque
<cb::list
> remaining(data_bufs
.begin(), data_bufs
.end());
1359 std::deque
<cb::list
> batch
;
1361 uint64_t batch_len
= 0;
1363 bool canceled
= true;
1364 while ((!remaining
.empty() || !batch
.empty()) &&
1365 (retries
<= MAX_RACE_RETRIES
)) {
1366 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1367 << " preparing push: remaining=" << remaining
.size()
1368 << " batch=" << batch
.size() << " retries=" << retries
1369 << " tid=" << tid
<< dendl
;
1370 std::unique_lock
l(m
);
1371 head_part_num
= info
.head_part_num
;
1372 auto max_part_size
= info
.params
.max_part_size
;
1373 auto overhead
= part_entry_overhead
;
1376 while (!remaining
.empty() &&
1377 (remaining
.front().length() + batch_len
<= max_part_size
)) {
1378 /* We can send entries with data_len up to max_entry_size,
1379 however, we want to also account the overhead when
1380 dealing with multiple entries. Previous check doesn't
1381 account for overhead on purpose. */
1382 batch_len
+= remaining
.front().length() + overhead
;
1383 batch
.push_back(std::move(remaining
.front()));
1384 remaining
.pop_front();
1386 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1387 << " prepared push: remaining=" << remaining
.size()
1388 << " batch=" << batch
.size() << " retries=" << retries
1389 << " batch_len=" << batch_len
1390 << " tid=" << tid
<< dendl
;
1392 auto r
= push_entries(dpp
, batch
, tid
, y
);
1396 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1397 << " need new head tid=" << tid
<< dendl
;
1398 r
= _prepare_new_head(dpp
, head_part_num
+ 1, tid
, y
);
1400 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1401 << " prepare_new_head failed: r=" << r
1402 << " tid=" << tid
<< dendl
;
1409 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1410 << " racing client trimmed part, rereading metadata "
1411 << "tid=" << tid
<< dendl
;
1414 r
= read_meta(dpp
, y
);
1416 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1417 << " read_meta failed: r=" << r
1418 << " tid=" << tid
<< dendl
;
1425 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1426 << " push_entries failed: r=" << r
1427 << " tid=" << tid
<< dendl
;
1430 // Made forward progress!
1434 if (r
== ssize(batch
)) {
1437 batch
.erase(batch
.begin(), batch
.begin() + r
);
1438 for (const auto& b
: batch
) {
1439 batch_len
+= b
.length() + part_entry_overhead
;
1444 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1445 << " canceled too many times, giving up: tid=" << tid
<< dendl
;
1451 struct Pusher
: public Completion
<Pusher
> {
1453 std::deque
<cb::list
> remaining
;
1454 std::deque
<cb::list
> batch
;
1456 std::int64_t head_part_num
;
1458 enum { pushing
, new_heading
, meta_reading
} state
= pushing
;
1460 void prep_then_push(const DoutPrefixProvider
*dpp
, Ptr
&& p
, const unsigned successes
) {
1461 std::unique_lock
l(f
->m
);
1462 auto max_part_size
= f
->info
.params
.max_part_size
;
1463 auto part_entry_overhead
= f
->part_entry_overhead
;
1464 head_part_num
= f
->info
.head_part_num
;
1467 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1468 << " preparing push: remaining=" << remaining
.size()
1469 << " batch=" << batch
.size() << " i=" << i
1470 << " tid=" << tid
<< dendl
;
1472 uint64_t batch_len
= 0;
1473 if (successes
> 0) {
1474 if (successes
== batch
.size()) {
1477 batch
.erase(batch
.begin(), batch
.begin() + successes
);
1478 for (const auto& b
: batch
) {
1479 batch_len
+= b
.length() + part_entry_overhead
;
1484 if (batch
.empty() && remaining
.empty()) {
1485 complete(std::move(p
), 0);
1489 while (!remaining
.empty() &&
1490 (remaining
.front().length() + batch_len
<= max_part_size
)) {
1492 /* We can send entries with data_len up to max_entry_size,
1493 however, we want to also account the overhead when
1494 dealing with multiple entries. Previous check doesn't
1495 account for overhead on purpose. */
1496 batch_len
+= remaining
.front().length() + part_entry_overhead
;
1497 batch
.push_back(std::move(remaining
.front()));
1498 remaining
.pop_front();
1500 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1501 << " prepared push: remaining=" << remaining
.size()
1502 << " batch=" << batch
.size() << " i=" << i
1503 << " batch_len=" << batch_len
1504 << " tid=" << tid
<< dendl
;
1508 void push(Ptr
&& p
) {
1509 f
->push_entries(batch
, tid
, call(std::move(p
)));
1512 void new_head(const DoutPrefixProvider
*dpp
, Ptr
&& p
) {
1513 state
= new_heading
;
1514 f
->_prepare_new_head(dpp
, head_part_num
+ 1, tid
, call(std::move(p
)));
1517 void read_meta(const DoutPrefixProvider
*dpp
, Ptr
&& p
) {
1519 state
= meta_reading
;
1520 f
->read_meta(dpp
, tid
, call(std::move(p
)));
1523 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
1527 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1528 << " need new head tid=" << tid
<< dendl
;
1529 new_head(dpp
, std::move(p
));
1533 if (i
> MAX_RACE_RETRIES
) {
1534 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1535 << " racing client deleted part, but we're out"
1536 << " of retries: tid=" << tid
<< dendl
;
1537 complete(std::move(p
), r
);
1539 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1540 << " racing client deleted part: tid=" << tid
<< dendl
;
1541 read_meta(dpp
, std::move(p
));
1545 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1546 << " push_entries failed: r=" << r
1547 << " tid=" << tid
<< dendl
;
1548 complete(std::move(p
), r
);
1551 i
= 0; // We've made forward progress, so reset the race counter!
1552 prep_then_push(dpp
, std::move(p
), r
);
1557 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1558 << " prepare_new_head failed: r=" << r
1559 << " tid=" << tid
<< dendl
;
1560 complete(std::move(p
), r
);
1564 handle_new_head(dpp
, std::move(p
), r
);
1569 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1570 << " read_meta failed: r=" << r
1571 << " tid=" << tid
<< dendl
;
1572 complete(std::move(p
), r
);
1576 prep_then_push(dpp
, std::move(p
), r
);
1581 void handle_new_head(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
1582 if (r
== -ECANCELED
) {
1583 if (p
->i
== MAX_RACE_RETRIES
) {
1584 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1585 << " canceled too many times, giving up: tid=" << tid
<< dendl
;
1586 complete(std::move(p
), -ECANCELED
);
1591 complete(std::move(p
), r
);
1595 if (p
->batch
.empty()) {
1596 prep_then_push(dpp
, std::move(p
), 0);
1604 Pusher(const DoutPrefixProvider
*dpp
, FIFO
* f
, std::deque
<cb::list
>&& remaining
,
1605 std::int64_t head_part_num
, std::uint64_t tid
,
1606 lr::AioCompletion
* super
)
1607 : Completion(dpp
, super
), f(f
), remaining(std::move(remaining
)),
1608 head_part_num(head_part_num
), tid(tid
) {}
1611 void FIFO::push(const DoutPrefixProvider
*dpp
, const std::vector
<cb::list
>& data_bufs
,
1612 lr::AioCompletion
* c
)
1614 std::unique_lock
l(m
);
1615 auto tid
= ++next_tid
;
1616 auto max_entry_size
= info
.params
.max_entry_size
;
1617 auto need_new_head
= info
.need_new_head();
1618 auto head_part_num
= info
.head_part_num
;
1620 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1621 << " entering: tid=" << tid
<< dendl
;
1622 auto p
= std::make_unique
<Pusher
>(dpp
, this, std::deque
<cb::list
>(data_bufs
.begin(), data_bufs
.end()),
1623 head_part_num
, tid
, c
);
1625 for (const auto& bl
: data_bufs
) {
1626 if (bl
.length() > max_entry_size
) {
1627 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1628 << " entry bigger than max_entry_size tid=" << tid
<< dendl
;
1629 Pusher::complete(std::move(p
), -E2BIG
);
1634 if (data_bufs
.empty() ) {
1635 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1636 << " empty push, returning success tid=" << tid
<< dendl
;
1637 Pusher::complete(std::move(p
), 0);
1641 if (need_new_head
) {
1642 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1643 << " need new head tid=" << tid
<< dendl
;
1644 p
->new_head(dpp
, std::move(p
));
1646 p
->prep_then_push(dpp
, std::move(p
), 0);
1650 int FIFO::list(const DoutPrefixProvider
*dpp
, int max_entries
,
1651 std::optional
<std::string_view
> markstr
,
1652 std::vector
<list_entry
>* presult
, bool* pmore
,
1655 std::unique_lock
l(m
);
1656 auto tid
= ++next_tid
;
1657 std::int64_t part_num
= info
.tail_part_num
;
1659 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1660 << " entering: tid=" << tid
<< dendl
;
1661 std::uint64_t ofs
= 0;
1663 auto marker
= to_marker(*markstr
);
1665 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1666 << " invalid marker string: " << markstr
1667 << " tid= "<< tid
<< dendl
;
1670 part_num
= marker
->num
;
1674 std::vector
<list_entry
> result
;
1675 result
.reserve(max_entries
);
1678 std::vector
<fifo::part_list_entry
> entries
;
1680 while (max_entries
> 0) {
1681 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1682 << " max_entries=" << max_entries
<< " tid=" << tid
<< dendl
;
1683 bool part_more
= false;
1684 bool part_full
= false;
1686 std::unique_lock
l(m
);
1687 auto part_oid
= info
.part_oid(part_num
);
1690 r
= list_part(dpp
, ioctx
, part_oid
, ofs
, max_entries
, &entries
,
1691 &part_more
, &part_full
, tid
, y
);
1693 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1694 << " missing part, rereading metadata"
1695 << " tid= "<< tid
<< dendl
;
1696 r
= read_meta(dpp
, tid
, y
);
1698 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1699 << " read_meta failed: r=" << r
1700 << " tid= "<< tid
<< dendl
;
1703 if (part_num
< info
.tail_part_num
) {
1704 /* raced with trim? restart */
1705 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1706 << " raced with trim, restarting: tid=" << tid
<< dendl
;
1707 max_entries
+= result
.size();
1709 std::unique_lock
l(m
);
1710 part_num
= info
.tail_part_num
;
1715 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1716 << " assuming part was not written yet, so end of data: "
1717 << "tid=" << tid
<< dendl
;
1723 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1724 << " list_entries failed: r=" << r
1725 << " tid= "<< tid
<< dendl
;
1728 more
= part_full
|| part_more
;
1729 for (auto& entry
: entries
) {
1731 e
.data
= std::move(entry
.data
);
1732 e
.marker
= marker
{part_num
, entry
.ofs
}.to_string();
1733 e
.mtime
= entry
.mtime
;
1734 result
.push_back(std::move(e
));
1736 if (max_entries
== 0)
1740 if (max_entries
> 0 &&
1745 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1746 << " head part is not full, so we can assume we're done: "
1747 << "tid=" << tid
<< dendl
;
1756 *presult
= std::move(result
);
1762 int FIFO::trim(const DoutPrefixProvider
*dpp
, std::string_view markstr
, bool exclusive
, optional_yield y
)
1764 bool overshoot
= false;
1765 auto marker
= to_marker(markstr
);
1769 auto part_num
= marker
->num
;
1770 auto ofs
= marker
->ofs
;
1771 std::unique_lock
l(m
);
1772 auto tid
= ++next_tid
;
1773 auto hn
= info
.head_part_num
;
1774 const auto max_part_size
= info
.params
.max_part_size
;
1775 if (part_num
> hn
) {
1777 auto r
= read_meta(dpp
, tid
, y
);
1782 auto hn
= info
.head_part_num
;
1783 if (part_num
> hn
) {
1786 ofs
= max_part_size
;
1789 if (part_num
< info
.tail_part_num
) {
1792 auto pn
= info
.tail_part_num
;
1794 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1795 << " entering: tid=" << tid
<< dendl
;
1798 while (pn
< part_num
) {
1799 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1800 << " pn=" << pn
<< " tid=" << tid
<< dendl
;
1801 std::unique_lock
l(m
);
1803 r
= trim_part(dpp
, pn
, max_part_size
, false, tid
, y
);
1804 if (r
< 0 && r
== -ENOENT
) {
1805 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1806 << " trim_part failed: r=" << r
1807 << " tid= "<< tid
<< dendl
;
1812 r
= trim_part(dpp
, part_num
, ofs
, exclusive
, tid
, y
);
1813 if (r
< 0 && r
!= -ENOENT
) {
1814 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1815 << " trim_part failed: r=" << r
1816 << " tid= "<< tid
<< dendl
;
1821 auto tail_part_num
= info
.tail_part_num
;
1822 auto objv
= info
.version
;
1824 bool canceled
= tail_part_num
< part_num
;
1826 while ((tail_part_num
< part_num
) &&
1828 (retries
<= MAX_RACE_RETRIES
)) {
1829 r
= _update_meta(dpp
, fifo::update
{}.tail_part_num(part_num
), objv
, &canceled
,
1832 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1833 << " _update_meta failed: r=" << r
1834 << " tid= "<< tid
<< dendl
;
1838 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1839 << " canceled: retries=" << retries
1840 << " tid=" << tid
<< dendl
;
1842 tail_part_num
= info
.tail_part_num
;
1843 objv
= info
.version
;
1849 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1850 << " canceled too many times, giving up: tid=" << tid
<< dendl
;
1853 return overshoot
? -ENODATA
: 0;
1856 struct Trimmer
: public Completion
<Trimmer
> {
1858 std::int64_t part_num
;
1863 bool update
= false;
1864 bool reread
= false;
1865 bool canceled
= false;
1866 bool overshoot
= false;
1869 Trimmer(const DoutPrefixProvider
*dpp
, FIFO
* fifo
, std::int64_t part_num
, std::uint64_t ofs
, std::int64_t pn
,
1870 bool exclusive
, lr::AioCompletion
* super
, std::uint64_t tid
)
1871 : Completion(dpp
, super
), fifo(fifo
), part_num(part_num
), ofs(ofs
), pn(pn
),
1872 exclusive(exclusive
), tid(tid
) {}
1874 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
1875 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1876 << " entering: tid=" << tid
<< dendl
;
1881 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1882 << " read_meta failed: r="
1883 << r
<< " tid=" << tid
<< dendl
;
1884 complete(std::move(p
), r
);
1887 std::unique_lock
l(fifo
->m
);
1888 auto hn
= fifo
->info
.head_part_num
;
1889 const auto max_part_size
= fifo
->info
.params
.max_part_size
;
1890 const auto tail_part_num
= fifo
->info
.tail_part_num
;
1892 if (part_num
> hn
) {
1894 ofs
= max_part_size
;
1897 if (part_num
< tail_part_num
) {
1898 complete(std::move(p
), -ENODATA
);
1902 if (pn
< part_num
) {
1903 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1904 << " pn=" << pn
<< " tid=" << tid
<< dendl
;
1905 fifo
->trim_part(dpp
, pn
++, max_part_size
, false, tid
,
1906 call(std::move(p
)));
1909 canceled
= tail_part_num
< part_num
;
1910 fifo
->trim_part(dpp
, part_num
, ofs
, exclusive
, tid
, call(std::move(p
)));
1920 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1921 << (update
? " update_meta " : " trim ") << "failed: r="
1922 << r
<< " tid=" << tid
<< dendl
;
1923 complete(std::move(p
), r
);
1928 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1929 << " handling preceding trim callback: tid=" << tid
<< dendl
;
1931 if (pn
< part_num
) {
1932 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1933 << " pn=" << pn
<< " tid=" << tid
<< dendl
;
1934 std::unique_lock
l(fifo
->m
);
1935 const auto max_part_size
= fifo
->info
.params
.max_part_size
;
1937 fifo
->trim_part(dpp
, pn
++, max_part_size
, false, tid
,
1938 call(std::move(p
)));
1942 std::unique_lock
l(fifo
->m
);
1943 const auto tail_part_num
= fifo
->info
.tail_part_num
;
1946 canceled
= tail_part_num
< part_num
;
1947 fifo
->trim_part(dpp
, part_num
, ofs
, exclusive
, tid
, call(std::move(p
)));
1951 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1952 << " handling update-needed callback: tid=" << tid
<< dendl
;
1953 std::unique_lock
l(fifo
->m
);
1954 auto tail_part_num
= fifo
->info
.tail_part_num
;
1955 auto objv
= fifo
->info
.version
;
1957 if ((tail_part_num
< part_num
) &&
1959 if (retries
> MAX_RACE_RETRIES
) {
1960 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1961 << " canceled too many times, giving up: tid=" << tid
<< dendl
;
1962 complete(std::move(p
), -EIO
);
1966 fifo
->_update_meta(dpp
, fifo::update
{}
1967 .tail_part_num(part_num
), objv
, &canceled
,
1968 tid
, call(std::move(p
)));
1970 complete(std::move(p
), overshoot
? -ENODATA
: 0);
1975 void FIFO::trim(const DoutPrefixProvider
*dpp
, std::string_view markstr
, bool exclusive
,
1976 lr::AioCompletion
* c
) {
1977 auto marker
= to_marker(markstr
);
1978 auto realmark
= marker
.value_or(::rgw::cls::fifo::marker
{});
1979 std::unique_lock
l(m
);
1980 const auto hn
= info
.head_part_num
;
1981 const auto max_part_size
= info
.params
.max_part_size
;
1982 const auto pn
= info
.tail_part_num
;
1983 const auto part_oid
= info
.part_oid(pn
);
1984 auto tid
= ++next_tid
;
1986 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1987 << " entering: tid=" << tid
<< dendl
;
1988 auto trimmer
= std::make_unique
<Trimmer
>(dpp
, this, realmark
.num
, realmark
.ofs
,
1989 pn
, exclusive
, c
, tid
);
1991 Trimmer::complete(std::move(trimmer
), -EINVAL
);
1995 auto ofs
= marker
->ofs
;
1996 if (marker
->num
> hn
) {
1997 trimmer
->reread
= true;
1998 read_meta(dpp
, tid
, Trimmer::call(std::move(trimmer
)));
2001 if (pn
< marker
->num
) {
2002 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2003 << " pn=" << pn
<< " tid=" << tid
<< dendl
;
2004 ofs
= max_part_size
;
2006 trimmer
->update
= true;
2008 trim_part(dpp
, pn
, ofs
, exclusive
, tid
, Trimmer::call(std::move(trimmer
)));
2011 int FIFO::get_part_info(const DoutPrefixProvider
*dpp
, int64_t part_num
,
2012 fifo::part_header
* header
,
2015 std::unique_lock
l(m
);
2016 const auto part_oid
= info
.part_oid(part_num
);
2017 auto tid
= ++next_tid
;
2019 auto r
= rgw::cls::fifo::get_part_info(dpp
, ioctx
, part_oid
, header
, tid
, y
);
2021 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2022 << " get_part_info failed: r="
2023 << r
<< " tid=" << tid
<< dendl
;
2028 void FIFO::get_part_info(int64_t part_num
,
2029 fifo::part_header
* header
,
2030 lr::AioCompletion
* c
)
2032 std::unique_lock
l(m
);
2033 const auto part_oid
= info
.part_oid(part_num
);
2034 auto tid
= ++next_tid
;
2036 auto op
= rgw::cls::fifo::get_part_info(cct
, header
, tid
);
2037 auto r
= ioctx
.aio_operate(part_oid
, c
, &op
, nullptr);
2038 ceph_assert(r
>= 0);
2041 struct InfoGetter
: Completion
<InfoGetter
> {
2043 fifo::part_header header
;
2044 fu2::function
<void(int r
, fifo::part_header
&&)> f
;
2046 bool headerread
= false;
2048 InfoGetter(const DoutPrefixProvider
*dpp
, FIFO
* fifo
, fu2::function
<void(int r
, fifo::part_header
&&)> f
,
2049 std::uint64_t tid
, lr::AioCompletion
* super
)
2050 : Completion(dpp
, super
), fifo(fifo
), f(std::move(f
)), tid(tid
) {}
2051 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
2054 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2055 << " read_meta failed: r="
2056 << r
<< " tid=" << tid
<< dendl
;
2059 complete(std::move(p
), r
);
2063 auto info
= fifo
->meta();
2064 auto hpn
= info
.head_part_num
;
2066 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2067 << " no head, returning empty partinfo r="
2068 << r
<< " tid=" << tid
<< dendl
;
2071 complete(std::move(p
), r
);
2075 auto op
= rgw::cls::fifo::get_part_info(fifo
->cct
, &header
, tid
);
2076 std::unique_lock
l(fifo
->m
);
2077 auto oid
= fifo
->info
.part_oid(hpn
);
2079 r
= fifo
->ioctx
.aio_operate(oid
, call(std::move(p
)), &op
,
2081 ceph_assert(r
>= 0);
2086 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2087 << " get_part_info failed: r="
2088 << r
<< " tid=" << tid
<< dendl
;
2092 f(r
, std::move(header
));
2093 complete(std::move(p
), r
);
2098 void FIFO::get_head_info(const DoutPrefixProvider
*dpp
, fu2::unique_function
<void(int r
,
2099 fifo::part_header
&&)> f
,
2100 lr::AioCompletion
* c
)
2102 std::unique_lock
l(m
);
2103 auto tid
= ++next_tid
;
2105 auto ig
= std::make_unique
<InfoGetter
>(dpp
, this, std::move(f
), tid
, c
);
2106 read_meta(dpp
, tid
, InfoGetter::call(std::move(ig
)));
2109 struct JournalProcessor
: public Completion
<JournalProcessor
> {
2113 std::vector
<fifo::journal_entry
> processed
;
2114 decltype(fifo
->info
.journal
) journal
;
2115 decltype(journal
)::iterator iter
;
2116 std::int64_t new_tail
;
2117 std::int64_t new_head
;
2118 std::int64_t new_max
;
2119 int race_retries
= 0;
2120 bool first_pp
= true;
2121 bool canceled
= false;
2129 void create_part(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int64_t part_num
) {
2130 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2131 << " entering: tid=" << tid
<< dendl
;
2132 state
= entry_callback
;
2133 lr::ObjectWriteOperation op
;
2134 op
.create(false); /* We don't need exclusivity, part_init ensures
2135 we're creating from the same journal entry. */
2136 std::unique_lock
l(fifo
->m
);
2137 part_init(&op
, fifo
->info
.params
);
2138 auto oid
= fifo
->info
.part_oid(part_num
);
2140 auto r
= fifo
->ioctx
.aio_operate(oid
, call(std::move(p
)), &op
);
2141 ceph_assert(r
>= 0);
2145 void remove_part(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int64_t part_num
) {
2146 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2147 << " entering: tid=" << tid
<< dendl
;
2148 state
= entry_callback
;
2149 lr::ObjectWriteOperation op
;
2151 std::unique_lock
l(fifo
->m
);
2152 auto oid
= fifo
->info
.part_oid(part_num
);
2154 auto r
= fifo
->ioctx
.aio_operate(oid
, call(std::move(p
)), &op
);
2155 ceph_assert(r
>= 0);
2159 void finish_je(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
,
2160 const fifo::journal_entry
& entry
) {
2161 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2162 << " entering: tid=" << tid
<< dendl
;
2164 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2165 << " finishing entry: entry=" << entry
2166 << " tid=" << tid
<< dendl
;
2168 using enum fifo::journal_entry::Op
;
2169 if (entry
.op
== remove
&& r
== -ENOENT
)
2173 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2174 << " processing entry failed: entry=" << entry
2175 << " r=" << r
<< " tid=" << tid
<< dendl
;
2176 complete(std::move(p
), r
);
2182 // Can't happen. Filtered out in process.
2183 complete(std::move(p
), -EIO
);
2187 if (entry
.part_num
> new_max
) {
2188 new_max
= entry
.part_num
;
2192 if (entry
.part_num
>= new_tail
) {
2193 new_tail
= entry
.part_num
+ 1;
2197 processed
.push_back(entry
);
2200 process(dpp
, std::move(p
));
2203 void postprocess(const DoutPrefixProvider
*dpp
, Ptr
&& p
) {
2204 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2205 << " entering: tid=" << tid
<< dendl
;
2206 if (processed
.empty()) {
2207 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2208 << " nothing to update any more: race_retries="
2209 << race_retries
<< " tid=" << tid
<< dendl
;
2210 complete(std::move(p
), 0);
2213 pp_run(dpp
, std::move(p
), 0, false);
2218 JournalProcessor(const DoutPrefixProvider
*dpp
, FIFO
* fifo
, std::uint64_t tid
, lr::AioCompletion
* super
)
2219 : Completion(dpp
, super
), fifo(fifo
), tid(tid
) {
2220 std::unique_lock
l(fifo
->m
);
2221 journal
= fifo
->info
.journal
;
2222 iter
= journal
.begin();
2223 new_tail
= fifo
->info
.tail_part_num
;
2224 new_head
= fifo
->info
.head_part_num
;
2225 new_max
= fifo
->info
.max_push_part_num
;
2228 void pp_run(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
, bool canceled
) {
2229 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2230 << " entering: tid=" << tid
<< dendl
;
2231 std::optional
<int64_t> tail_part_num
;
2232 std::optional
<int64_t> head_part_num
;
2233 std::optional
<int64_t> max_part_num
;
2236 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2237 << " failed, r=: " << r
<< " tid=" << tid
<< dendl
;
2238 complete(std::move(p
), r
);
2242 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2243 << " postprocessing: race_retries="
2244 << race_retries
<< " tid=" << tid
<< dendl
;
2246 if (!first_pp
&& r
== 0 && !canceled
) {
2247 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2248 << " nothing to update any more: race_retries="
2249 << race_retries
<< " tid=" << tid
<< dendl
;
2250 complete(std::move(p
), 0);
2257 if (race_retries
>= MAX_RACE_RETRIES
) {
2258 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2259 << " canceled too many times, giving up: tid="
2261 complete(std::move(p
), -ECANCELED
);
2264 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2265 << " update canceled, retrying: race_retries="
2266 << race_retries
<< " tid=" << tid
<< dendl
;
2270 std::vector
<fifo::journal_entry
> new_processed
;
2271 std::unique_lock
l(fifo
->m
);
2272 for (auto& e
: processed
) {
2273 if (fifo
->info
.journal
.contains(e
)) {
2274 new_processed
.push_back(e
);
2277 processed
= std::move(new_processed
);
2280 std::unique_lock
l(fifo
->m
);
2281 auto objv
= fifo
->info
.version
;
2282 if (new_tail
> fifo
->info
.tail_part_num
) {
2283 tail_part_num
= new_tail
;
2286 if (new_head
> fifo
->info
.head_part_num
) {
2287 head_part_num
= new_head
;
2290 if (new_max
> fifo
->info
.max_push_part_num
) {
2291 max_part_num
= new_max
;
2295 if (processed
.empty() &&
2298 /* nothing to update anymore */
2299 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2300 << " nothing to update any more: race_retries="
2301 << race_retries
<< " tid=" << tid
<< dendl
;
2302 complete(std::move(p
), 0);
2305 state
= pp_callback
;
2306 fifo
->_update_meta(dpp
, fifo::update
{}
2307 .tail_part_num(tail_part_num
)
2308 .head_part_num(head_part_num
)
2309 .max_push_part_num(max_part_num
)
2310 .journal_entries_rm(processed
),
2311 objv
, &this->canceled
, tid
, call(std::move(p
)));
2315 JournalProcessor(const JournalProcessor
&) = delete;
2316 JournalProcessor
& operator =(const JournalProcessor
&) = delete;
2317 JournalProcessor(JournalProcessor
&&) = delete;
2318 JournalProcessor
& operator =(JournalProcessor
&&) = delete;
2320 void process(const DoutPrefixProvider
*dpp
, Ptr
&& p
) {
2321 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2322 << " entering: tid=" << tid
<< dendl
;
2323 while (iter
!= journal
.end()) {
2324 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2325 << " processing entry: entry=" << *iter
2326 << " tid=" << tid
<< dendl
;
2327 const auto entry
= *iter
;
2329 using enum fifo::journal_entry::Op
;
2331 create_part(dpp
, std::move(p
), entry
.part_num
);
2334 if (entry
.part_num
> new_head
) {
2335 new_head
= entry
.part_num
;
2337 processed
.push_back(entry
);
2341 remove_part(dpp
, std::move(p
), entry
.part_num
);
2344 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2345 << " unknown journaled op: entry=" << entry
<< " tid="
2347 complete(std::move(p
), -EIO
);
2351 postprocess(dpp
, std::move(p
));
2355 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
2356 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2357 << " entering: tid=" << tid
<< dendl
;
2359 case entry_callback
:
2360 finish_je(dpp
, std::move(p
), r
, *iter
);
2365 pp_run(dpp
, std::move(p
), r
, c
);
2374 void FIFO::process_journal(const DoutPrefixProvider
*dpp
, std::uint64_t tid
, lr::AioCompletion
* c
) {
2375 auto p
= std::make_unique
<JournalProcessor
>(dpp
, this, tid
, c
);
2376 p
->process(dpp
, std::move(p
));
2379 struct Lister
: Completion
<Lister
> {
2381 std::vector
<list_entry
> result
;
2383 std::int64_t part_num
;
2387 std::vector
<fifo::part_list_entry
> entries
;
2388 bool part_more
= false;
2389 bool part_full
= false;
2390 std::vector
<list_entry
>* entries_out
;
2396 void complete(Ptr
&& p
, int r
) {
2398 if (more_out
) *more_out
= more
;
2399 if (entries_out
) *entries_out
= std::move(result
);
2401 Completion::complete(std::move(p
), r
);
2405 Lister(const DoutPrefixProvider
*dpp
, FIFO
* f
, std::int64_t part_num
, std::uint64_t ofs
, int max_entries
,
2406 std::vector
<list_entry
>* entries_out
, bool* more_out
,
2407 std::uint64_t tid
, lr::AioCompletion
* super
)
2408 : Completion(dpp
, super
), f(f
), part_num(part_num
), ofs(ofs
), max_entries(max_entries
),
2409 entries_out(entries_out
), more_out(more_out
), tid(tid
) {
2410 result
.reserve(max_entries
);
2413 Lister(const Lister
&) = delete;
2414 Lister
& operator =(const Lister
&) = delete;
2415 Lister(Lister
&&) = delete;
2416 Lister
& operator =(Lister
&&) = delete;
2418 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
2420 handle_read(std::move(p
), r
);
2422 handle_list(dpp
, std::move(p
), r
);
2425 void list(Ptr
&& p
) {
2426 if (max_entries
> 0) {
2431 std::unique_lock
l(f
->m
);
2432 auto part_oid
= f
->info
.part_oid(part_num
);
2436 auto op
= list_part(f
->cct
, ofs
, max_entries
, &r_out
,
2437 &entries
, &part_more
, &part_full
, tid
);
2438 f
->ioctx
.aio_operate(part_oid
, call(std::move(p
)), &op
, nullptr);
2440 complete(std::move(p
), 0);
2444 void handle_read(Ptr
&& p
, int r
) {
2446 if (r
>= 0) r
= r_out
;
2450 complete(std::move(p
), r
);
2454 if (part_num
< f
->info
.tail_part_num
) {
2455 /* raced with trim? restart */
2456 max_entries
+= result
.size();
2458 part_num
= f
->info
.tail_part_num
;
2463 /* assuming part was not written yet, so end of data */
2465 complete(std::move(p
), 0);
2469 void handle_list(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
2470 if (r
>= 0) r
= r_out
;
2472 std::unique_lock
l(f
->m
);
2473 auto part_oid
= f
->info
.part_oid(part_num
);
2477 f
->read_meta(dpp
, tid
, call(std::move(p
)));
2481 complete(std::move(p
), r
);
2485 more
= part_full
|| part_more
;
2486 for (auto& entry
: entries
) {
2488 e
.data
= std::move(entry
.data
);
2489 e
.marker
= marker
{part_num
, entry
.ofs
}.to_string();
2490 e
.mtime
= entry
.mtime
;
2491 result
.push_back(std::move(e
));
2493 max_entries
-= entries
.size();
2495 if (max_entries
> 0 && part_more
) {
2500 if (!part_full
) { /* head part is not full */
2501 complete(std::move(p
), 0);
2510 void FIFO::list(const DoutPrefixProvider
*dpp
, int max_entries
,
2511 std::optional
<std::string_view
> markstr
,
2512 std::vector
<list_entry
>* out
,
2514 lr::AioCompletion
* c
) {
2515 std::unique_lock
l(m
);
2516 auto tid
= ++next_tid
;
2517 std::int64_t part_num
= info
.tail_part_num
;
2519 std::uint64_t ofs
= 0;
2520 std::optional
<::rgw::cls::fifo::marker
> marker
;
2523 marker
= to_marker(*markstr
);
2525 part_num
= marker
->num
;
2530 auto ls
= std::make_unique
<Lister
>(dpp
, this, part_num
, ofs
, max_entries
, out
,
2532 if (markstr
&& !marker
) {
2534 l
->complete(std::move(ls
), -EINVAL
);
2536 ls
->list(std::move(ls
));