1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2020 Red Hat <contact@redhat.com>
7 * Author: Adam C. Emerson
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
19 #include <string_view>
21 #undef FMT_HEADER_ONLY
22 #define FMT_HEADER_ONLY 1
23 #include <fmt/format.h>
25 #include "include/rados/librados.hpp"
27 #include "include/buffer.h"
29 #include "common/async/yield_context.h"
30 #include "common/random_string.h"
32 #include "cls/fifo/cls_fifo_types.h"
33 #include "cls/fifo/cls_fifo_ops.h"
35 #include "cls_fifo_legacy.h"
37 namespace rgw::cls::fifo
{
38 static constexpr auto dout_subsys
= ceph_subsys_objclass
;
39 namespace cb
= ceph::buffer
;
40 namespace fifo
= rados::cls::fifo
;
42 using ceph::from_error_code
;
44 inline constexpr auto MAX_RACE_RETRIES
= 10;
46 void create_meta(lr::ObjectWriteOperation
* op
,
48 std::optional
<fifo::objv
> objv
,
49 std::optional
<std::string_view
> oid_prefix
,
51 std::uint64_t max_part_size
,
52 std::uint64_t max_entry_size
)
54 fifo::op::create_meta cm
;
58 cm
.oid_prefix
= oid_prefix
;
59 cm
.max_part_size
= max_part_size
;
60 cm
.max_entry_size
= max_entry_size
;
61 cm
.exclusive
= exclusive
;
65 op
->exec(fifo::op::CLASS
, fifo::op::CREATE_META
, in
);
68 int get_meta(const DoutPrefixProvider
*dpp
, lr::IoCtx
& ioctx
, const std::string
& oid
,
69 std::optional
<fifo::objv
> objv
, fifo::info
* info
,
70 std::uint32_t* part_header_size
,
71 std::uint32_t* part_entry_overhead
,
72 uint64_t tid
, optional_yield y
,
75 lr::ObjectReadOperation op
;
76 fifo::op::get_meta gm
;
82 op
.exec(fifo::op::CLASS
, fifo::op::GET_META
, in
,
84 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, nullptr, y
);
86 fifo::op::get_meta_reply reply
;
87 auto iter
= bl
.cbegin();
89 if (info
) *info
= std::move(reply
.info
);
90 if (part_header_size
) *part_header_size
= reply
.part_header_size
;
91 if (part_entry_overhead
)
92 *part_entry_overhead
= reply
.part_entry_overhead
;
93 } catch (const cb::error
& err
) {
95 << __PRETTY_FUNCTION__
<< ":" << __LINE__
96 << " decode failed: " << err
.what()
97 << " tid=" << tid
<< dendl
;
98 r
= from_error_code(err
.code());
99 } else if (!(probe
&& (r
== -ENOENT
|| r
== -ENODATA
))) {
101 << __PRETTY_FUNCTION__
<< ":" << __LINE__
102 << " fifo::op::GET_META failed r=" << r
<< " tid=" << tid
109 void update_meta(lr::ObjectWriteOperation
* op
, const fifo::objv
& objv
,
110 const fifo::update
& update
)
112 fifo::op::update_meta um
;
115 um
.tail_part_num
= update
.tail_part_num();
116 um
.head_part_num
= update
.head_part_num();
117 um
.min_push_part_num
= update
.min_push_part_num();
118 um
.max_push_part_num
= update
.max_push_part_num();
119 um
.journal_entries_add
= std::move(update
).journal_entries_add();
120 um
.journal_entries_rm
= std::move(update
).journal_entries_rm();
124 op
->exec(fifo::op::CLASS
, fifo::op::UPDATE_META
, in
);
127 void part_init(lr::ObjectWriteOperation
* op
, std::string_view tag
,
128 fifo::data_params params
)
130 fifo::op::init_part ip
;
137 op
->exec(fifo::op::CLASS
, fifo::op::INIT_PART
, in
);
140 int push_part(const DoutPrefixProvider
*dpp
, lr::IoCtx
& ioctx
, const std::string
& oid
, std::string_view tag
,
141 std::deque
<cb::list
> data_bufs
, std::uint64_t tid
,
144 lr::ObjectWriteOperation op
;
145 fifo::op::push_part pp
;
148 pp
.data_bufs
= data_bufs
;
151 for (const auto& bl
: data_bufs
)
152 pp
.total_len
+= bl
.length();
157 op
.exec(fifo::op::CLASS
, fifo::op::PUSH_PART
, in
, nullptr, &retval
);
158 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, y
, lr::OPERATION_RETURNVEC
);
161 << __PRETTY_FUNCTION__
<< ":" << __LINE__
162 << " fifo::op::PUSH_PART failed r=" << r
163 << " tid=" << tid
<< dendl
;
168 << __PRETTY_FUNCTION__
<< ":" << __LINE__
169 << " error handling response retval=" << retval
170 << " tid=" << tid
<< dendl
;
175 void push_part(lr::IoCtx
& ioctx
, const std::string
& oid
, std::string_view tag
,
176 std::deque
<cb::list
> data_bufs
, std::uint64_t tid
,
177 lr::AioCompletion
* c
)
179 lr::ObjectWriteOperation op
;
180 fifo::op::push_part pp
;
183 pp
.data_bufs
= data_bufs
;
186 for (const auto& bl
: data_bufs
)
187 pp
.total_len
+= bl
.length();
191 op
.exec(fifo::op::CLASS
, fifo::op::PUSH_PART
, in
);
192 auto r
= ioctx
.aio_operate(oid
, c
, &op
, lr::OPERATION_RETURNVEC
);
196 void trim_part(lr::ObjectWriteOperation
* op
,
197 std::optional
<std::string_view
> tag
,
198 std::uint64_t ofs
, bool exclusive
)
200 fifo::op::trim_part tp
;
204 tp
.exclusive
= exclusive
;
208 op
->exec(fifo::op::CLASS
, fifo::op::TRIM_PART
, in
);
211 int list_part(const DoutPrefixProvider
*dpp
, lr::IoCtx
& ioctx
, const std::string
& oid
,
212 std::optional
<std::string_view
> tag
, std::uint64_t ofs
,
213 std::uint64_t max_entries
,
214 std::vector
<fifo::part_list_entry
>* entries
,
215 bool* more
, bool* full_part
, std::string
* ptag
,
216 std::uint64_t tid
, optional_yield y
)
218 lr::ObjectReadOperation op
;
219 fifo::op::list_part lp
;
223 lp
.max_entries
= max_entries
;
228 op
.exec(fifo::op::CLASS
, fifo::op::LIST_PART
, in
, &bl
, nullptr);
229 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, nullptr, y
);
231 fifo::op::list_part_reply reply
;
232 auto iter
= bl
.cbegin();
234 if (entries
) *entries
= std::move(reply
.entries
);
235 if (more
) *more
= reply
.more
;
236 if (full_part
) *full_part
= reply
.full_part
;
237 if (ptag
) *ptag
= reply
.tag
;
238 } catch (const cb::error
& err
) {
240 << __PRETTY_FUNCTION__
<< ":" << __LINE__
241 << " decode failed: " << err
.what()
242 << " tid=" << tid
<< dendl
;
243 r
= from_error_code(err
.code());
244 } else if (r
!= -ENOENT
) {
246 << __PRETTY_FUNCTION__
<< ":" << __LINE__
247 << " fifo::op::LIST_PART failed r=" << r
<< " tid=" << tid
253 struct list_entry_completion
: public lr::ObjectOperationCompletion
{
256 std::vector
<fifo::part_list_entry
>* entries
;
262 list_entry_completion(CephContext
* cct
, int* r_out
, std::vector
<fifo::part_list_entry
>* entries
,
263 bool* more
, bool* full_part
, std::string
* ptag
,
265 : cct(cct
), r_out(r_out
), entries(entries
), more(more
),
266 full_part(full_part
), ptag(ptag
), tid(tid
) {}
267 virtual ~list_entry_completion() = default;
268 void handle_completion(int r
, bufferlist
& bl
) override
{
270 fifo::op::list_part_reply reply
;
271 auto iter
= bl
.cbegin();
273 if (entries
) *entries
= std::move(reply
.entries
);
274 if (more
) *more
= reply
.more
;
275 if (full_part
) *full_part
= reply
.full_part
;
276 if (ptag
) *ptag
= reply
.tag
;
277 } catch (const cb::error
& err
) {
279 << __PRETTY_FUNCTION__
<< ":" << __LINE__
280 << " decode failed: " << err
.what()
281 << " tid=" << tid
<< dendl
;
282 r
= from_error_code(err
.code());
285 << __PRETTY_FUNCTION__
<< ":" << __LINE__
286 << " fifo::op::LIST_PART failed r=" << r
<< " tid=" << tid
289 if (r_out
) *r_out
= r
;
293 lr::ObjectReadOperation
list_part(CephContext
* cct
,
294 std::optional
<std::string_view
> tag
,
296 std::uint64_t max_entries
,
298 std::vector
<fifo::part_list_entry
>* entries
,
299 bool* more
, bool* full_part
,
300 std::string
* ptag
, std::uint64_t tid
)
302 lr::ObjectReadOperation op
;
303 fifo::op::list_part lp
;
307 lp
.max_entries
= max_entries
;
311 op
.exec(fifo::op::CLASS
, fifo::op::LIST_PART
, in
,
312 new list_entry_completion(cct
, r_out
, entries
, more
, full_part
,
317 int get_part_info(const DoutPrefixProvider
*dpp
, lr::IoCtx
& ioctx
, const std::string
& oid
,
318 fifo::part_header
* header
,
319 std::uint64_t tid
, optional_yield y
)
321 lr::ObjectReadOperation op
;
322 fifo::op::get_part_info gpi
;
327 op
.exec(fifo::op::CLASS
, fifo::op::GET_PART_INFO
, in
, &bl
, nullptr);
328 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, nullptr, y
);
330 fifo::op::get_part_info_reply reply
;
331 auto iter
= bl
.cbegin();
333 if (header
) *header
= std::move(reply
.header
);
334 } catch (const cb::error
& err
) {
336 << __PRETTY_FUNCTION__
<< ":" << __LINE__
337 << " decode failed: " << err
.what()
338 << " tid=" << tid
<< dendl
;
339 r
= from_error_code(err
.code());
342 << __PRETTY_FUNCTION__
<< ":" << __LINE__
343 << " fifo::op::GET_PART_INFO failed r=" << r
<< " tid=" << tid
349 struct partinfo_completion
: public lr::ObjectOperationCompletion
{
352 fifo::part_header
* h
;
354 partinfo_completion(CephContext
* cct
, int* rp
, fifo::part_header
* h
,
356 cct(cct
), rp(rp
), h(h
), tid(tid
) {
358 virtual ~partinfo_completion() = default;
359 void handle_completion(int r
, bufferlist
& bl
) override
{
361 fifo::op::get_part_info_reply reply
;
362 auto iter
= bl
.cbegin();
364 if (h
) *h
= std::move(reply
.header
);
365 } catch (const cb::error
& err
) {
366 r
= from_error_code(err
.code());
367 lderr(cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
368 << " decode failed: " << err
.what()
369 << " tid=" << tid
<< dendl
;
371 lderr(cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
372 << " fifo::op::GET_PART_INFO failed r=" << r
<< " tid=" << tid
381 lr::ObjectReadOperation
get_part_info(CephContext
* cct
,
382 fifo::part_header
* header
,
383 std::uint64_t tid
, int* r
= 0)
385 lr::ObjectReadOperation op
;
386 fifo::op::get_part_info gpi
;
391 op
.exec(fifo::op::CLASS
, fifo::op::GET_PART_INFO
, in
,
392 new partinfo_completion(cct
, r
, header
, tid
));
397 std::optional
<marker
> FIFO::to_marker(std::string_view s
)
401 m
.num
= info
.tail_part_num
;
406 auto pos
= s
.find(':');
407 if (pos
== string::npos
) {
411 auto num
= s
.substr(0, pos
);
412 auto ofs
= s
.substr(pos
+ 1);
414 auto n
= ceph::parse
<decltype(m
.num
)>(num
);
419 auto o
= ceph::parse
<decltype(m
.ofs
)>(ofs
);
427 std::string
FIFO::generate_tag() const
429 static constexpr auto HEADER_TAG_SIZE
= 16;
430 return gen_rand_alphanumeric_plain(static_cast<CephContext
*>(ioctx
.cct()),
435 int FIFO::apply_update(fifo::info
* info
,
436 const fifo::objv
& objv
,
437 const fifo::update
& update
,
440 ldout(cct
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
441 << " entering: tid=" << tid
<< dendl
;
442 std::unique_lock
l(m
);
443 if (objv
!= info
->version
) {
444 lderr(cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
445 << " version mismatch, canceling: tid=" << tid
<< dendl
;
448 auto err
= info
->apply_update(update
);
450 lderr(cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
451 << " error applying update: " << *err
<< " tid=" << tid
<< dendl
;
460 int FIFO::_update_meta(const DoutPrefixProvider
*dpp
, const fifo::update
& update
,
461 fifo::objv version
, bool* pcanceled
,
462 std::uint64_t tid
, optional_yield y
)
464 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
465 << " entering: tid=" << tid
<< dendl
;
466 lr::ObjectWriteOperation op
;
467 bool canceled
= false;
468 update_meta(&op
, info
.version
, update
);
469 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, y
);
470 if (r
>= 0 || r
== -ECANCELED
) {
471 canceled
= (r
== -ECANCELED
);
473 r
= apply_update(&info
, version
, update
, tid
);
474 if (r
< 0) canceled
= true;
477 r
= read_meta(dpp
, tid
, y
);
478 canceled
= r
< 0 ? false : true;
481 if (pcanceled
) *pcanceled
= canceled
;
483 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
484 << " canceled: tid=" << tid
<< dendl
;
487 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
488 << " returning error: r=" << r
<< " tid=" << tid
<< dendl
;
493 struct Updater
: public Completion
<Updater
> {
498 bool* pcanceled
= nullptr;
500 Updater(const DoutPrefixProvider
*dpp
, FIFO
* fifo
, lr::AioCompletion
* super
,
501 const fifo::update
& update
, fifo::objv version
,
502 bool* pcanceled
, std::uint64_t tid
)
503 : Completion(dpp
, super
), fifo(fifo
), update(update
), version(version
),
504 pcanceled(pcanceled
) {}
506 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
507 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
508 << " entering: tid=" << tid
<< dendl
;
510 handle_reread(std::move(p
), r
);
512 handle_update(dpp
, std::move(p
), r
);
515 void handle_update(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
516 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
517 << " handling async update_meta: tid="
519 if (r
< 0 && r
!= -ECANCELED
) {
520 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
521 << " update failed: r=" << r
<< " tid=" << tid
<< dendl
;
522 complete(std::move(p
), r
);
525 bool canceled
= (r
== -ECANCELED
);
527 int r
= fifo
->apply_update(&fifo
->info
, version
, update
, tid
);
529 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
530 << " update failed, marking canceled: r=" << r
531 << " tid=" << tid
<< dendl
;
537 fifo
->read_meta(dpp
, tid
, call(std::move(p
)));
542 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
543 << " completing: tid=" << tid
<< dendl
;
544 complete(std::move(p
), 0);
547 void handle_reread(Ptr
&& p
, int r
) {
548 ldout(fifo
->cct
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
549 << " handling async read_meta: tid="
551 if (r
< 0 && pcanceled
) {
553 } else if (r
>= 0 && pcanceled
) {
557 lderr(fifo
->cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
558 << " failed dispatching read_meta: r=" << r
<< " tid="
561 ldout(fifo
->cct
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
562 << " completing: tid=" << tid
<< dendl
;
564 complete(std::move(p
), r
);
568 void FIFO::_update_meta(const DoutPrefixProvider
*dpp
, const fifo::update
& update
,
569 fifo::objv version
, bool* pcanceled
,
570 std::uint64_t tid
, lr::AioCompletion
* c
)
572 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
573 << " entering: tid=" << tid
<< dendl
;
574 lr::ObjectWriteOperation op
;
575 update_meta(&op
, info
.version
, update
);
576 auto updater
= std::make_unique
<Updater
>(dpp
, this, c
, update
, version
, pcanceled
,
578 auto r
= ioctx
.aio_operate(oid
, Updater::call(std::move(updater
)), &op
);
582 int FIFO::create_part(const DoutPrefixProvider
*dpp
, int64_t part_num
, std::string_view tag
, std::uint64_t tid
,
585 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
586 << " entering: tid=" << tid
<< dendl
;
587 lr::ObjectWriteOperation op
;
588 op
.create(false); /* We don't need exclusivity, part_init ensures
589 we're creating from the same journal entry. */
590 std::unique_lock
l(m
);
591 part_init(&op
, tag
, info
.params
);
592 auto oid
= info
.part_oid(part_num
);
594 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, y
);
596 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
597 << " part_init failed: r=" << r
<< " tid="
603 int FIFO::remove_part(const DoutPrefixProvider
*dpp
, int64_t part_num
, std::string_view tag
, std::uint64_t tid
,
606 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
607 << " entering: tid=" << tid
<< dendl
;
608 lr::ObjectWriteOperation op
;
610 std::unique_lock
l(m
);
611 auto oid
= info
.part_oid(part_num
);
613 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, y
);
615 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
616 << " remove failed: r=" << r
<< " tid="
622 int FIFO::process_journal(const DoutPrefixProvider
*dpp
, std::uint64_t tid
, optional_yield y
)
624 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
625 << " entering: tid=" << tid
<< dendl
;
626 std::vector
<fifo::journal_entry
> processed
;
628 std::unique_lock
l(m
);
629 auto tmpjournal
= info
.journal
;
630 auto new_tail
= info
.tail_part_num
;
631 auto new_head
= info
.head_part_num
;
632 auto new_max
= info
.max_push_part_num
;
636 for (auto& [n
, entry
] : tmpjournal
) {
637 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
638 << " processing entry: entry=" << entry
<< " tid=" << tid
641 case fifo::journal_entry::Op::create
:
642 r
= create_part(dpp
, entry
.part_num
, entry
.part_tag
, tid
, y
);
643 if (entry
.part_num
> new_max
) {
644 new_max
= entry
.part_num
;
647 case fifo::journal_entry::Op::set_head
:
649 if (entry
.part_num
> new_head
) {
650 new_head
= entry
.part_num
;
653 case fifo::journal_entry::Op::remove
:
654 r
= remove_part(dpp
, entry
.part_num
, entry
.part_tag
, tid
, y
);
655 if (r
== -ENOENT
) r
= 0;
656 if (entry
.part_num
>= new_tail
) {
657 new_tail
= entry
.part_num
+ 1;
661 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
662 << " unknown journaled op: entry=" << entry
<< " tid="
668 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
669 << " processing entry failed: entry=" << entry
670 << " r=" << r
<< " tid=" << tid
<< dendl
;
674 processed
.push_back(std::move(entry
));
678 bool canceled
= true;
680 for (auto i
= 0; canceled
&& i
< MAX_RACE_RETRIES
; ++i
) {
681 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
682 << " postprocessing: i=" << i
<< " tid=" << tid
<< dendl
;
684 std::optional
<int64_t> tail_part_num
;
685 std::optional
<int64_t> head_part_num
;
686 std::optional
<int64_t> max_part_num
;
688 std::unique_lock
l(m
);
689 auto objv
= info
.version
;
690 if (new_tail
> tail_part_num
) tail_part_num
= new_tail
;
691 if (new_head
> info
.head_part_num
) head_part_num
= new_head
;
692 if (new_max
> info
.max_push_part_num
) max_part_num
= new_max
;
695 if (processed
.empty() &&
698 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
699 << " nothing to update any more: i=" << i
<< " tid="
704 auto u
= fifo::update().tail_part_num(tail_part_num
)
705 .head_part_num(head_part_num
).max_push_part_num(max_part_num
)
706 .journal_entries_rm(processed
);
707 r
= _update_meta(dpp
, u
, objv
, &canceled
, tid
, y
);
709 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
710 << " _update_meta failed: update=" << u
711 << " r=" << r
<< " tid=" << tid
<< dendl
;
716 std::vector
<fifo::journal_entry
> new_processed
;
717 std::unique_lock
l(m
);
718 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
719 << " update canceled, retrying: i=" << i
<< " tid="
721 for (auto& e
: processed
) {
722 auto jiter
= info
.journal
.find(e
.part_num
);
723 /* journal entry was already processed */
724 if (jiter
== info
.journal
.end() ||
725 !(jiter
->second
== e
)) {
728 new_processed
.push_back(e
);
730 processed
= std::move(new_processed
);
733 if (r
== 0 && canceled
) {
734 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
735 << " canceled too many times, giving up: tid=" << tid
<< dendl
;
739 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
740 << " failed, r=: " << r
<< " tid=" << tid
<< dendl
;
745 int FIFO::_prepare_new_part(const DoutPrefixProvider
*dpp
, bool is_head
, std::uint64_t tid
, optional_yield y
)
747 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
748 << " entering: tid=" << tid
<< dendl
;
749 std::unique_lock
l(m
);
750 std::vector jentries
= { info
.next_journal_entry(generate_tag()) };
751 if (info
.journal
.find(jentries
.front().part_num
) != info
.journal
.end()) {
753 ldpp_dout(dpp
, 5) << __PRETTY_FUNCTION__
<< ":" << __LINE__
754 << " new part journaled, but not processed: tid="
756 auto r
= process_journal(dpp
, tid
, y
);
758 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
759 << " process_journal failed: r=" << r
<< " tid=" << tid
<< dendl
;
763 std::int64_t new_head_part_num
= info
.head_part_num
;
764 auto version
= info
.version
;
767 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
768 << " needs new head: tid=" << tid
<< dendl
;
769 auto new_head_jentry
= jentries
.front();
770 new_head_jentry
.op
= fifo::journal_entry::Op::set_head
;
771 new_head_part_num
= jentries
.front().part_num
;
772 jentries
.push_back(std::move(new_head_jentry
));
777 bool canceled
= true;
778 for (auto i
= 0; canceled
&& i
< MAX_RACE_RETRIES
; ++i
) {
780 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
781 << " updating metadata: i=" << i
<< " tid=" << tid
<< dendl
;
782 auto u
= fifo::update
{}.journal_entries_add(jentries
);
783 r
= _update_meta(dpp
, u
, version
, &canceled
, tid
, y
);
784 if (r
>= 0 && canceled
) {
785 std::unique_lock
l(m
);
786 auto found
= (info
.journal
.find(jentries
.front().part_num
) !=
788 if ((info
.max_push_part_num
>= jentries
.front().part_num
&&
789 info
.head_part_num
>= new_head_part_num
)) {
790 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
791 << " raced, but journaled and processed: i=" << i
792 << " tid=" << tid
<< dendl
;
796 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
797 << " raced, journaled but not processed: i=" << i
798 << " tid=" << tid
<< dendl
;
804 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
805 << " _update_meta failed: update=" << u
<< " r=" << r
806 << " tid=" << tid
<< dendl
;
811 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
812 << " canceled too many times, giving up: tid=" << tid
<< dendl
;
815 r
= process_journal(dpp
, tid
, y
);
817 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
818 << " process_journal failed: r=" << r
<< " tid=" << tid
<< dendl
;
823 int FIFO::_prepare_new_head(const DoutPrefixProvider
*dpp
, std::uint64_t tid
, optional_yield y
)
825 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
826 << " entering: tid=" << tid
<< dendl
;
827 std::unique_lock
l(m
);
828 std::int64_t new_head_num
= info
.head_part_num
+ 1;
829 auto max_push_part_num
= info
.max_push_part_num
;
830 auto version
= info
.version
;
834 if (max_push_part_num
< new_head_num
) {
835 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
836 << " need new part: tid=" << tid
<< dendl
;
837 r
= _prepare_new_part(dpp
, true, tid
, y
);
839 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
840 << " _prepare_new_part failed: r=" << r
841 << " tid=" << tid
<< dendl
;
844 std::unique_lock
l(m
);
845 if (info
.max_push_part_num
< new_head_num
) {
846 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
847 << " inconsistency, push part less than head part: "
848 << " tid=" << tid
<< dendl
;
855 bool canceled
= true;
856 for (auto i
= 0; canceled
&& i
< MAX_RACE_RETRIES
; ++i
) {
857 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
858 << " updating head: i=" << i
<< " tid=" << tid
<< dendl
;
859 auto u
= fifo::update
{}.head_part_num(new_head_num
);
860 r
= _update_meta(dpp
, u
, version
, &canceled
, tid
, y
);
862 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
863 << " _update_meta failed: update=" << u
<< " r=" << r
864 << " tid=" << tid
<< dendl
;
867 std::unique_lock
l(m
);
868 auto head_part_num
= info
.head_part_num
;
869 version
= info
.version
;
871 if (canceled
&& (head_part_num
>= new_head_num
)) {
872 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
873 << " raced, but completed by the other caller: i=" << i
874 << " tid=" << tid
<< dendl
;
879 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
880 << " canceled too many times, giving up: tid=" << tid
<< dendl
;
886 struct NewPartPreparer
: public Completion
<NewPartPreparer
> {
888 std::vector
<fifo::journal_entry
> jentries
;
890 std::int64_t new_head_part_num
;
891 bool canceled
= false;
894 NewPartPreparer(const DoutPrefixProvider
*dpp
, FIFO
* f
, lr::AioCompletion
* super
,
895 std::vector
<fifo::journal_entry
> jentries
,
896 std::int64_t new_head_part_num
,
898 : Completion(dpp
, super
), f(f
), jentries(std::move(jentries
)),
899 new_head_part_num(new_head_part_num
), tid(tid
) {}
901 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
902 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
903 << " entering: tid=" << tid
<< dendl
;
905 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
906 << " _update_meta failed: r=" << r
907 << " tid=" << tid
<< dendl
;
908 complete(std::move(p
), r
);
913 std::unique_lock
l(f
->m
);
914 auto iter
= f
->info
.journal
.find(jentries
.front().part_num
);
915 auto max_push_part_num
= f
->info
.max_push_part_num
;
916 auto head_part_num
= f
->info
.head_part_num
;
917 auto version
= f
->info
.version
;
918 auto found
= (iter
!= f
->info
.journal
.end());
920 if ((max_push_part_num
>= jentries
.front().part_num
&&
921 head_part_num
>= new_head_part_num
)) {
922 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
923 << " raced, but journaled and processed: i=" << i
924 << " tid=" << tid
<< dendl
;
925 complete(std::move(p
), 0);
928 if (i
>= MAX_RACE_RETRIES
) {
929 complete(std::move(p
), -ECANCELED
);
934 f
->_update_meta(dpp
, fifo::update
{}
935 .journal_entries_add(jentries
),
936 version
, &canceled
, tid
, call(std::move(p
)));
939 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
940 << " raced, journaled but not processed: i=" << i
941 << " tid=" << tid
<< dendl
;
944 // Fall through. We still need to process the journal.
946 f
->process_journal(dpp
, tid
, super());
951 void FIFO::_prepare_new_part(const DoutPrefixProvider
*dpp
, bool is_head
, std::uint64_t tid
,
952 lr::AioCompletion
* c
)
954 std::unique_lock
l(m
);
955 std::vector jentries
= { info
.next_journal_entry(generate_tag()) };
956 if (info
.journal
.find(jentries
.front().part_num
) != info
.journal
.end()) {
958 ldpp_dout(dpp
, 5) << __PRETTY_FUNCTION__
<< ":" << __LINE__
959 << " new part journaled, but not processed: tid="
961 process_journal(dpp
, tid
, c
);
964 std::int64_t new_head_part_num
= info
.head_part_num
;
965 auto version
= info
.version
;
968 auto new_head_jentry
= jentries
.front();
969 new_head_jentry
.op
= fifo::journal_entry::Op::set_head
;
970 new_head_part_num
= jentries
.front().part_num
;
971 jentries
.push_back(std::move(new_head_jentry
));
975 auto n
= std::make_unique
<NewPartPreparer
>(dpp
, this, c
, jentries
,
976 new_head_part_num
, tid
);
978 _update_meta(dpp
, fifo::update
{}.journal_entries_add(jentries
), version
,
979 &np
->canceled
, tid
, NewPartPreparer::call(std::move(n
)));
982 struct NewHeadPreparer
: public Completion
<NewHeadPreparer
> {
986 std::int64_t new_head_num
;
987 bool canceled
= false;
990 NewHeadPreparer(const DoutPrefixProvider
*dpp
, FIFO
* f
, lr::AioCompletion
* super
,
991 bool newpart
, std::int64_t new_head_num
, std::uint64_t tid
)
992 : Completion(dpp
, super
), f(f
), newpart(newpart
), new_head_num(new_head_num
),
995 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
997 handle_newpart(std::move(p
), r
);
999 handle_update(dpp
, std::move(p
), r
);
1002 void handle_newpart(Ptr
&& p
, int r
) {
1004 lderr(f
->cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1005 << " _prepare_new_part failed: r=" << r
1006 << " tid=" << tid
<< dendl
;
1007 complete(std::move(p
), r
);
1010 std::unique_lock
l(f
->m
);
1011 if (f
->info
.max_push_part_num
< new_head_num
) {
1013 lderr(f
->cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1014 << " _prepare_new_part failed: r=" << r
1015 << " tid=" << tid
<< dendl
;
1016 complete(std::move(p
), -EIO
);
1019 complete(std::move(p
), 0);
1023 void handle_update(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
1024 std::unique_lock
l(f
->m
);
1025 auto head_part_num
= f
->info
.head_part_num
;
1026 auto version
= f
->info
.version
;
1030 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1031 << " _update_meta failed: r=" << r
1032 << " tid=" << tid
<< dendl
;
1033 complete(std::move(p
), r
);
1037 if (i
>= MAX_RACE_RETRIES
) {
1038 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1039 << " canceled too many times, giving up: tid=" << tid
<< dendl
;
1040 complete(std::move(p
), -ECANCELED
);
1044 // Raced, but there's still work to do!
1045 if (head_part_num
< new_head_num
) {
1048 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1049 << " updating head: i=" << i
<< " tid=" << tid
<< dendl
;
1050 f
->_update_meta(dpp
, fifo::update
{}.head_part_num(new_head_num
),
1051 version
, &this->canceled
, tid
, call(std::move(p
)));
1055 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1056 << " succeeded : i=" << i
<< " tid=" << tid
<< dendl
;
1057 complete(std::move(p
), 0);
1062 void FIFO::_prepare_new_head(const DoutPrefixProvider
*dpp
, std::uint64_t tid
, lr::AioCompletion
* c
)
1064 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1065 << " entering: tid=" << tid
<< dendl
;
1066 std::unique_lock
l(m
);
1067 int64_t new_head_num
= info
.head_part_num
+ 1;
1068 auto max_push_part_num
= info
.max_push_part_num
;
1069 auto version
= info
.version
;
1072 if (max_push_part_num
< new_head_num
) {
1073 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1074 << " need new part: tid=" << tid
<< dendl
;
1075 auto n
= std::make_unique
<NewHeadPreparer
>(dpp
, this, c
, true, new_head_num
,
1077 _prepare_new_part(dpp
, true, tid
, NewHeadPreparer::call(std::move(n
)));
1079 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1080 << " updating head: tid=" << tid
<< dendl
;
1081 auto n
= std::make_unique
<NewHeadPreparer
>(dpp
, this, c
, false, new_head_num
,
1084 _update_meta(dpp
, fifo::update
{}.head_part_num(new_head_num
), version
,
1085 &np
->canceled
, tid
, NewHeadPreparer::call(std::move(n
)));
1089 int FIFO::push_entries(const DoutPrefixProvider
*dpp
, const std::deque
<cb::list
>& data_bufs
,
1090 std::uint64_t tid
, optional_yield y
)
1092 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1093 << " entering: tid=" << tid
<< dendl
;
1094 std::unique_lock
l(m
);
1095 auto head_part_num
= info
.head_part_num
;
1096 auto tag
= info
.head_tag
;
1097 const auto part_oid
= info
.part_oid(head_part_num
);
1100 auto r
= push_part(dpp
, ioctx
, part_oid
, tag
, data_bufs
, tid
, y
);
1102 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1103 << " push_part failed: r=" << r
<< " tid=" << tid
<< dendl
;
1108 void FIFO::push_entries(const std::deque
<cb::list
>& data_bufs
,
1109 std::uint64_t tid
, lr::AioCompletion
* c
)
1111 std::unique_lock
l(m
);
1112 auto head_part_num
= info
.head_part_num
;
1113 auto tag
= info
.head_tag
;
1114 const auto part_oid
= info
.part_oid(head_part_num
);
1117 push_part(ioctx
, part_oid
, tag
, data_bufs
, tid
, c
);
1120 int FIFO::trim_part(const DoutPrefixProvider
*dpp
, int64_t part_num
, uint64_t ofs
,
1121 std::optional
<std::string_view
> tag
,
1122 bool exclusive
, std::uint64_t tid
,
1125 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1126 << " entering: tid=" << tid
<< dendl
;
1127 lr::ObjectWriteOperation op
;
1128 std::unique_lock
l(m
);
1129 const auto part_oid
= info
.part_oid(part_num
);
1131 rgw::cls::fifo::trim_part(&op
, tag
, ofs
, exclusive
);
1132 auto r
= rgw_rados_operate(dpp
, ioctx
, part_oid
, &op
, y
);
1134 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1135 << " trim_part failed: r=" << r
<< " tid=" << tid
<< dendl
;
1140 void FIFO::trim_part(int64_t part_num
, uint64_t ofs
,
1141 std::optional
<std::string_view
> tag
,
1142 bool exclusive
, std::uint64_t tid
,
1143 lr::AioCompletion
* c
)
1145 ldout(cct
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1146 << " entering: tid=" << tid
<< dendl
;
1147 lr::ObjectWriteOperation op
;
1148 std::unique_lock
l(m
);
1149 const auto part_oid
= info
.part_oid(part_num
);
1151 rgw::cls::fifo::trim_part(&op
, tag
, ofs
, exclusive
);
1152 auto r
= ioctx
.aio_operate(part_oid
, c
, &op
);
1153 ceph_assert(r
>= 0);
1156 int FIFO::open(const DoutPrefixProvider
*dpp
, lr::IoCtx ioctx
, std::string oid
, std::unique_ptr
<FIFO
>* fifo
,
1157 optional_yield y
, std::optional
<fifo::objv
> objv
,
1161 << __PRETTY_FUNCTION__
<< ":" << __LINE__
1162 << " entering" << dendl
;
1166 int r
= get_meta(dpp
, ioctx
, std::move(oid
), objv
, &info
, &size
, &over
, 0, y
,
1169 if (!(probe
&& (r
== -ENOENT
|| r
== -ENODATA
))) {
1170 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1171 << " get_meta failed: r=" << r
<< dendl
;
1175 std::unique_ptr
<FIFO
> f(new FIFO(std::move(ioctx
), oid
));
1177 f
->part_header_size
= size
;
1178 f
->part_entry_overhead
= over
;
1179 // If there are journal entries, process them, in case
1180 // someone crashed mid-transaction.
1181 if (!info
.journal
.empty()) {
1183 << __PRETTY_FUNCTION__
<< ":" << __LINE__
1184 << " processing leftover journal" << dendl
;
1185 r
= f
->process_journal(dpp
, 0, y
);
1187 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1188 << " process_journal failed: r=" << r
<< dendl
;
1192 *fifo
= std::move(f
);
1196 int FIFO::create(const DoutPrefixProvider
*dpp
, lr::IoCtx ioctx
, std::string oid
, std::unique_ptr
<FIFO
>* fifo
,
1197 optional_yield y
, std::optional
<fifo::objv
> objv
,
1198 std::optional
<std::string_view
> oid_prefix
,
1199 bool exclusive
, std::uint64_t max_part_size
,
1200 std::uint64_t max_entry_size
)
1203 << __PRETTY_FUNCTION__
<< ":" << __LINE__
1204 << " entering" << dendl
;
1205 lr::ObjectWriteOperation op
;
1206 create_meta(&op
, oid
, objv
, oid_prefix
, exclusive
, max_part_size
,
1208 auto r
= rgw_rados_operate(dpp
, ioctx
, oid
, &op
, y
);
1210 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1211 << " create_meta failed: r=" << r
<< dendl
;
1214 r
= open(dpp
, std::move(ioctx
), std::move(oid
), fifo
, y
, objv
);
1218 int FIFO::read_meta(const DoutPrefixProvider
*dpp
, std::uint64_t tid
, optional_yield y
) {
1219 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1220 << " entering: tid=" << tid
<< dendl
;
1225 auto r
= get_meta(dpp
, ioctx
, oid
, nullopt
, &_info
, &_phs
, &_peo
, tid
, y
);
1227 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1228 << " get_meta failed: r=" << r
<< " tid=" << tid
<< dendl
;
1231 std::unique_lock
l(m
);
1232 // We have a newer version already!
1233 if (_info
.version
.same_or_later(this->info
.version
)) {
1234 info
= std::move(_info
);
1235 part_header_size
= _phs
;
1236 part_entry_overhead
= _peo
;
1241 int FIFO::read_meta(const DoutPrefixProvider
*dpp
, optional_yield y
) {
1242 std::unique_lock
l(m
);
1243 auto tid
= ++next_tid
;
1245 return read_meta(dpp
, tid
, y
);
1248 struct Reader
: public Completion
<Reader
> {
1252 Reader(const DoutPrefixProvider
*dpp
, FIFO
* fifo
, lr::AioCompletion
* super
, std::uint64_t tid
)
1253 : Completion(dpp
, super
), fifo(fifo
), tid(tid
) {}
1255 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
1256 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1257 << " entering: tid=" << tid
<< dendl
;
1259 fifo::op::get_meta_reply reply
;
1260 auto iter
= bl
.cbegin();
1261 decode(reply
, iter
);
1262 std::unique_lock
l(fifo
->m
);
1263 if (reply
.info
.version
.same_or_later(fifo
->info
.version
)) {
1264 fifo
->info
= std::move(reply
.info
);
1265 fifo
->part_header_size
= reply
.part_header_size
;
1266 fifo
->part_entry_overhead
= reply
.part_entry_overhead
;
1268 } catch (const cb::error
& err
) {
1269 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1270 << " failed to decode response err=" << err
.what()
1271 << " tid=" << tid
<< dendl
;
1272 r
= from_error_code(err
.code());
1274 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1275 << " read_meta failed r=" << r
1276 << " tid=" << tid
<< dendl
;
1278 complete(std::move(p
), r
);
1282 void FIFO::read_meta(const DoutPrefixProvider
*dpp
, std::uint64_t tid
, lr::AioCompletion
* c
)
1284 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1285 << " entering: tid=" << tid
<< dendl
;
1286 lr::ObjectReadOperation op
;
1287 fifo::op::get_meta gm
;
1290 auto reader
= std::make_unique
<Reader
>(dpp
, this, c
, tid
);
1291 auto rp
= reader
.get();
1292 auto r
= ioctx
.aio_exec(oid
, Reader::call(std::move(reader
)), fifo::op::CLASS
,
1293 fifo::op::GET_META
, in
, &rp
->bl
);
1297 const fifo::info
& FIFO::meta() const {
1301 std::pair
<std::uint32_t, std::uint32_t> FIFO::get_part_layout_info() const {
1302 return {part_header_size
, part_entry_overhead
};
1305 int FIFO::push(const DoutPrefixProvider
*dpp
, const cb::list
& bl
, optional_yield y
) {
1306 return push(dpp
, std::vector
{ bl
}, y
);
1309 void FIFO::push(const DoutPrefixProvider
*dpp
, const cb::list
& bl
, lr::AioCompletion
* c
) {
1310 push(dpp
, std::vector
{ bl
}, c
);
1313 int FIFO::push(const DoutPrefixProvider
*dpp
, const std::vector
<cb::list
>& data_bufs
, optional_yield y
)
1315 std::unique_lock
l(m
);
1316 auto tid
= ++next_tid
;
1317 auto max_entry_size
= info
.params
.max_entry_size
;
1318 auto need_new_head
= info
.need_new_head();
1320 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1321 << " entering: tid=" << tid
<< dendl
;
1322 if (data_bufs
.empty()) {
1323 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1324 << " empty push, returning success tid=" << tid
<< dendl
;
1329 for (const auto& bl
: data_bufs
) {
1330 if (bl
.length() > max_entry_size
) {
1331 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1332 << " entry bigger than max_entry_size tid=" << tid
<< dendl
;
1338 if (need_new_head
) {
1339 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1340 << " need new head tid=" << tid
<< dendl
;
1341 r
= _prepare_new_head(dpp
, tid
, y
);
1343 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1344 << " _prepare_new_head failed: r=" << r
1345 << " tid=" << tid
<< dendl
;
1350 std::deque
<cb::list
> remaining(data_bufs
.begin(), data_bufs
.end());
1351 std::deque
<cb::list
> batch
;
1353 uint64_t batch_len
= 0;
1355 bool canceled
= true;
1356 while ((!remaining
.empty() || !batch
.empty()) &&
1357 (retries
<= MAX_RACE_RETRIES
)) {
1358 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1359 << " preparing push: remaining=" << remaining
.size()
1360 << " batch=" << batch
.size() << " retries=" << retries
1361 << " tid=" << tid
<< dendl
;
1362 std::unique_lock
l(m
);
1363 auto max_part_size
= info
.params
.max_part_size
;
1364 auto overhead
= part_entry_overhead
;
1367 while (!remaining
.empty() &&
1368 (remaining
.front().length() + batch_len
<= max_part_size
)) {
1369 /* We can send entries with data_len up to max_entry_size,
1370 however, we want to also account the overhead when
1371 dealing with multiple entries. Previous check doesn't
1372 account for overhead on purpose. */
1373 batch_len
+= remaining
.front().length() + overhead
;
1374 batch
.push_back(std::move(remaining
.front()));
1375 remaining
.pop_front();
1377 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1378 << " prepared push: remaining=" << remaining
.size()
1379 << " batch=" << batch
.size() << " retries=" << retries
1380 << " batch_len=" << batch_len
1381 << " tid=" << tid
<< dendl
;
1383 auto r
= push_entries(dpp
, batch
, tid
, y
);
1387 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1388 << " need new head tid=" << tid
<< dendl
;
1389 r
= _prepare_new_head(dpp
, tid
, y
);
1391 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1392 << " prepare_new_head failed: r=" << r
1393 << " tid=" << tid
<< dendl
;
1400 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1401 << " push_entries failed: r=" << r
1402 << " tid=" << tid
<< dendl
;
1405 // Made forward progress!
1409 if (static_cast<unsigned>(r
) == batch
.size()) {
1412 batch
.erase(batch
.begin(), batch
.begin() + r
);
1413 for (const auto& b
: batch
) {
1414 batch_len
+= b
.length() + part_entry_overhead
;
1419 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1420 << " canceled too many times, giving up: tid=" << tid
<< dendl
;
1426 struct Pusher
: public Completion
<Pusher
> {
1428 std::deque
<cb::list
> remaining
;
1429 std::deque
<cb::list
> batch
;
1432 bool new_heading
= false;
1434 void prep_then_push(Ptr
&& p
, const unsigned successes
) {
1435 std::unique_lock
l(f
->m
);
1436 auto max_part_size
= f
->info
.params
.max_part_size
;
1437 auto part_entry_overhead
= f
->part_entry_overhead
;
1440 ldout(f
->cct
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1441 << " preparing push: remaining=" << remaining
.size()
1442 << " batch=" << batch
.size() << " i=" << i
1443 << " tid=" << tid
<< dendl
;
1445 uint64_t batch_len
= 0;
1446 if (successes
> 0) {
1447 if (successes
== batch
.size()) {
1450 batch
.erase(batch
.begin(), batch
.begin() + successes
);
1451 for (const auto& b
: batch
) {
1452 batch_len
+= b
.length() + part_entry_overhead
;
1457 if (batch
.empty() && remaining
.empty()) {
1458 complete(std::move(p
), 0);
1462 while (!remaining
.empty() &&
1463 (remaining
.front().length() + batch_len
<= max_part_size
)) {
1465 /* We can send entries with data_len up to max_entry_size,
1466 however, we want to also account the overhead when
1467 dealing with multiple entries. Previous check doesn't
1468 account for overhead on purpose. */
1469 batch_len
+= remaining
.front().length() + part_entry_overhead
;
1470 batch
.push_back(std::move(remaining
.front()));
1471 remaining
.pop_front();
1473 ldout(f
->cct
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1474 << " prepared push: remaining=" << remaining
.size()
1475 << " batch=" << batch
.size() << " i=" << i
1476 << " batch_len=" << batch_len
1477 << " tid=" << tid
<< dendl
;
1481 void push(Ptr
&& p
) {
1482 f
->push_entries(batch
, tid
, call(std::move(p
)));
1485 void new_head(const DoutPrefixProvider
*dpp
, Ptr
&& p
) {
1487 f
->_prepare_new_head(dpp
, tid
, call(std::move(p
)));
1490 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
1493 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1494 << " need new head tid=" << tid
<< dendl
;
1495 new_head(dpp
, std::move(p
));
1499 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1500 << " push_entries failed: r=" << r
1501 << " tid=" << tid
<< dendl
;
1502 complete(std::move(p
), r
);
1505 i
= 0; // We've made forward progress, so reset the race counter!
1506 prep_then_push(std::move(p
), r
);
1509 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1510 << " prepare_new_head failed: r=" << r
1511 << " tid=" << tid
<< dendl
;
1512 complete(std::move(p
), r
);
1515 new_heading
= false;
1516 handle_new_head(std::move(p
), r
);
1520 void handle_new_head(Ptr
&& p
, int r
) {
1521 if (r
== -ECANCELED
) {
1522 if (p
->i
== MAX_RACE_RETRIES
) {
1523 lderr(f
->cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1524 << " canceled too many times, giving up: tid=" << tid
<< dendl
;
1525 complete(std::move(p
), -ECANCELED
);
1530 complete(std::move(p
), r
);
1534 if (p
->batch
.empty()) {
1535 prep_then_push(std::move(p
), 0);
1543 Pusher(const DoutPrefixProvider
*dpp
, FIFO
* f
, std::deque
<cb::list
>&& remaining
,
1544 std::uint64_t tid
, lr::AioCompletion
* super
)
1545 : Completion(dpp
, super
), f(f
), remaining(std::move(remaining
)),
1549 void FIFO::push(const DoutPrefixProvider
*dpp
, const std::vector
<cb::list
>& data_bufs
,
1550 lr::AioCompletion
* c
)
1552 std::unique_lock
l(m
);
1553 auto tid
= ++next_tid
;
1554 auto max_entry_size
= info
.params
.max_entry_size
;
1555 auto need_new_head
= info
.need_new_head();
1557 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1558 << " entering: tid=" << tid
<< dendl
;
1559 auto p
= std::make_unique
<Pusher
>(dpp
, this, std::deque
<cb::list
>(data_bufs
.begin(), data_bufs
.end()),
1562 for (const auto& bl
: data_bufs
) {
1563 if (bl
.length() > max_entry_size
) {
1564 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1565 << " entry bigger than max_entry_size tid=" << tid
<< dendl
;
1566 Pusher::complete(std::move(p
), -E2BIG
);
1571 if (data_bufs
.empty() ) {
1572 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1573 << " empty push, returning success tid=" << tid
<< dendl
;
1574 Pusher::complete(std::move(p
), 0);
1578 if (need_new_head
) {
1579 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1580 << " need new head tid=" << tid
<< dendl
;
1581 p
->new_head(dpp
, std::move(p
));
1583 p
->prep_then_push(std::move(p
), 0);
1587 int FIFO::list(const DoutPrefixProvider
*dpp
, int max_entries
,
1588 std::optional
<std::string_view
> markstr
,
1589 std::vector
<list_entry
>* presult
, bool* pmore
,
1592 std::unique_lock
l(m
);
1593 auto tid
= ++next_tid
;
1594 std::int64_t part_num
= info
.tail_part_num
;
1596 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1597 << " entering: tid=" << tid
<< dendl
;
1598 std::uint64_t ofs
= 0;
1600 auto marker
= to_marker(*markstr
);
1602 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1603 << " invalid marker string: " << markstr
1604 << " tid= "<< tid
<< dendl
;
1607 part_num
= marker
->num
;
1611 std::vector
<list_entry
> result
;
1612 result
.reserve(max_entries
);
1615 std::vector
<fifo::part_list_entry
> entries
;
1617 while (max_entries
> 0) {
1618 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1619 << " max_entries=" << max_entries
<< " tid=" << tid
<< dendl
;
1620 bool part_more
= false;
1621 bool part_full
= false;
1623 std::unique_lock
l(m
);
1624 auto part_oid
= info
.part_oid(part_num
);
1627 r
= list_part(dpp
, ioctx
, part_oid
, {}, ofs
, max_entries
, &entries
,
1628 &part_more
, &part_full
, nullptr, tid
, y
);
1630 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1631 << " missing part, rereading metadata"
1632 << " tid= "<< tid
<< dendl
;
1633 r
= read_meta(dpp
, tid
, y
);
1635 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1636 << " read_meta failed: r=" << r
1637 << " tid= "<< tid
<< dendl
;
1640 if (part_num
< info
.tail_part_num
) {
1641 /* raced with trim? restart */
1642 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1643 << " raced with trim, restarting: tid=" << tid
<< dendl
;
1644 max_entries
+= result
.size();
1646 std::unique_lock
l(m
);
1647 part_num
= info
.tail_part_num
;
1652 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1653 << " assuming part was not written yet, so end of data: "
1654 << "tid=" << tid
<< dendl
;
1660 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1661 << " list_entries failed: r=" << r
1662 << " tid= "<< tid
<< dendl
;
1665 more
= part_full
|| part_more
;
1666 for (auto& entry
: entries
) {
1668 e
.data
= std::move(entry
.data
);
1669 e
.marker
= marker
{part_num
, entry
.ofs
}.to_string();
1670 e
.mtime
= entry
.mtime
;
1671 result
.push_back(std::move(e
));
1673 if (max_entries
== 0)
1677 if (max_entries
> 0 &&
1682 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1683 << " head part is not full, so we can assume we're done: "
1684 << "tid=" << tid
<< dendl
;
1693 *presult
= std::move(result
);
1699 int FIFO::trim(const DoutPrefixProvider
*dpp
, std::string_view markstr
, bool exclusive
, optional_yield y
)
1701 bool overshoot
= false;
1702 auto marker
= to_marker(markstr
);
1706 auto part_num
= marker
->num
;
1707 auto ofs
= marker
->ofs
;
1708 std::unique_lock
l(m
);
1709 auto tid
= ++next_tid
;
1710 auto hn
= info
.head_part_num
;
1711 const auto max_part_size
= info
.params
.max_part_size
;
1712 if (part_num
> hn
) {
1714 auto r
= read_meta(dpp
, tid
, y
);
1719 auto hn
= info
.head_part_num
;
1720 if (part_num
> hn
) {
1723 ofs
= max_part_size
;
1726 if (part_num
< info
.tail_part_num
) {
1729 auto pn
= info
.tail_part_num
;
1731 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1732 << " entering: tid=" << tid
<< dendl
;
1735 while (pn
< part_num
) {
1736 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1737 << " pn=" << pn
<< " tid=" << tid
<< dendl
;
1738 std::unique_lock
l(m
);
1740 r
= trim_part(dpp
, pn
, max_part_size
, std::nullopt
, false, tid
, y
);
1741 if (r
< 0 && r
== -ENOENT
) {
1742 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1743 << " trim_part failed: r=" << r
1744 << " tid= "<< tid
<< dendl
;
1749 r
= trim_part(dpp
, part_num
, ofs
, std::nullopt
, exclusive
, tid
, y
);
1750 if (r
< 0 && r
!= -ENOENT
) {
1751 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1752 << " trim_part failed: r=" << r
1753 << " tid= "<< tid
<< dendl
;
1758 auto tail_part_num
= info
.tail_part_num
;
1759 auto objv
= info
.version
;
1761 bool canceled
= tail_part_num
< part_num
;
1763 while ((tail_part_num
< part_num
) &&
1765 (retries
<= MAX_RACE_RETRIES
)) {
1766 r
= _update_meta(dpp
, fifo::update
{}.tail_part_num(part_num
), objv
, &canceled
,
1769 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1770 << " _update_meta failed: r=" << r
1771 << " tid= "<< tid
<< dendl
;
1775 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1776 << " canceled: retries=" << retries
1777 << " tid=" << tid
<< dendl
;
1779 tail_part_num
= info
.tail_part_num
;
1780 objv
= info
.version
;
1786 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1787 << " canceled too many times, giving up: tid=" << tid
<< dendl
;
1790 return overshoot
? -ENODATA
: 0;
1793 struct Trimmer
: public Completion
<Trimmer
> {
1795 std::int64_t part_num
;
1800 bool update
= false;
1801 bool reread
= false;
1802 bool canceled
= false;
1803 bool overshoot
= false;
1806 Trimmer(const DoutPrefixProvider
*dpp
, FIFO
* fifo
, std::int64_t part_num
, std::uint64_t ofs
, std::int64_t pn
,
1807 bool exclusive
, lr::AioCompletion
* super
, std::uint64_t tid
)
1808 : Completion(dpp
, super
), fifo(fifo
), part_num(part_num
), ofs(ofs
), pn(pn
),
1809 exclusive(exclusive
), tid(tid
) {}
1811 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
1812 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1813 << " entering: tid=" << tid
<< dendl
;
1818 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1819 << " read_meta failed: r="
1820 << r
<< " tid=" << tid
<< dendl
;
1821 complete(std::move(p
), r
);
1824 std::unique_lock
l(fifo
->m
);
1825 auto hn
= fifo
->info
.head_part_num
;
1826 const auto max_part_size
= fifo
->info
.params
.max_part_size
;
1827 const auto tail_part_num
= fifo
->info
.tail_part_num
;
1829 if (part_num
> hn
) {
1831 ofs
= max_part_size
;
1834 if (part_num
< tail_part_num
) {
1835 complete(std::move(p
), -ENODATA
);
1839 if (pn
< part_num
) {
1840 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1841 << " pn=" << pn
<< " tid=" << tid
<< dendl
;
1842 fifo
->trim_part(pn
++, max_part_size
, std::nullopt
,
1843 false, tid
, call(std::move(p
)));
1846 canceled
= tail_part_num
< part_num
;
1847 fifo
->trim_part(part_num
, ofs
, std::nullopt
, exclusive
, tid
,
1848 call(std::move(p
)));
1858 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1859 << (update
? " update_meta " : " trim ") << "failed: r="
1860 << r
<< " tid=" << tid
<< dendl
;
1861 complete(std::move(p
), r
);
1866 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1867 << " handling preceding trim callback: tid=" << tid
<< dendl
;
1869 if (pn
< part_num
) {
1870 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1871 << " pn=" << pn
<< " tid=" << tid
<< dendl
;
1872 std::unique_lock
l(fifo
->m
);
1873 const auto max_part_size
= fifo
->info
.params
.max_part_size
;
1875 fifo
->trim_part(pn
++, max_part_size
, std::nullopt
,
1876 false, tid
, call(std::move(p
)));
1880 std::unique_lock
l(fifo
->m
);
1881 const auto tail_part_num
= fifo
->info
.tail_part_num
;
1884 canceled
= tail_part_num
< part_num
;
1885 fifo
->trim_part(part_num
, ofs
, std::nullopt
, exclusive
, tid
,
1886 call(std::move(p
)));
1890 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1891 << " handling update-needed callback: tid=" << tid
<< dendl
;
1892 std::unique_lock
l(fifo
->m
);
1893 auto tail_part_num
= fifo
->info
.tail_part_num
;
1894 auto objv
= fifo
->info
.version
;
1896 if ((tail_part_num
< part_num
) &&
1898 if (retries
> MAX_RACE_RETRIES
) {
1899 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1900 << " canceled too many times, giving up: tid=" << tid
<< dendl
;
1901 complete(std::move(p
), -EIO
);
1905 fifo
->_update_meta(dpp
, fifo::update
{}
1906 .tail_part_num(part_num
), objv
, &canceled
,
1907 tid
, call(std::move(p
)));
1909 complete(std::move(p
), overshoot
? -ENODATA
: 0);
1914 void FIFO::trim(const DoutPrefixProvider
*dpp
, std::string_view markstr
, bool exclusive
,
1915 lr::AioCompletion
* c
) {
1916 auto marker
= to_marker(markstr
);
1917 auto realmark
= marker
.value_or(::rgw::cls::fifo::marker
{});
1918 std::unique_lock
l(m
);
1919 const auto hn
= info
.head_part_num
;
1920 const auto max_part_size
= info
.params
.max_part_size
;
1921 const auto pn
= info
.tail_part_num
;
1922 const auto part_oid
= info
.part_oid(pn
);
1923 auto tid
= ++next_tid
;
1925 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1926 << " entering: tid=" << tid
<< dendl
;
1927 auto trimmer
= std::make_unique
<Trimmer
>(dpp
, this, realmark
.num
, realmark
.ofs
,
1928 pn
, exclusive
, c
, tid
);
1930 Trimmer::complete(std::move(trimmer
), -EINVAL
);
1934 auto ofs
= marker
->ofs
;
1935 if (marker
->num
> hn
) {
1936 trimmer
->reread
= true;
1937 read_meta(dpp
, tid
, Trimmer::call(std::move(trimmer
)));
1940 if (pn
< marker
->num
) {
1941 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1942 << " pn=" << pn
<< " tid=" << tid
<< dendl
;
1943 ofs
= max_part_size
;
1945 trimmer
->update
= true;
1947 trim_part(pn
, ofs
, std::nullopt
, exclusive
,
1948 tid
, Trimmer::call(std::move(trimmer
)));
1951 int FIFO::get_part_info(const DoutPrefixProvider
*dpp
, int64_t part_num
,
1952 fifo::part_header
* header
,
1955 std::unique_lock
l(m
);
1956 const auto part_oid
= info
.part_oid(part_num
);
1957 auto tid
= ++next_tid
;
1959 auto r
= rgw::cls::fifo::get_part_info(dpp
, ioctx
, part_oid
, header
, tid
, y
);
1961 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1962 << " get_part_info failed: r="
1963 << r
<< " tid=" << tid
<< dendl
;
1968 void FIFO::get_part_info(int64_t part_num
,
1969 fifo::part_header
* header
,
1970 lr::AioCompletion
* c
)
1972 std::unique_lock
l(m
);
1973 const auto part_oid
= info
.part_oid(part_num
);
1974 auto tid
= ++next_tid
;
1976 auto op
= rgw::cls::fifo::get_part_info(cct
, header
, tid
);
1977 auto r
= ioctx
.aio_operate(part_oid
, c
, &op
, nullptr);
1978 ceph_assert(r
>= 0);
1981 struct InfoGetter
: Completion
<InfoGetter
> {
1983 fifo::part_header header
;
1984 fu2::function
<void(int r
, fifo::part_header
&&)> f
;
1986 bool headerread
= false;
1988 InfoGetter(const DoutPrefixProvider
*dpp
, FIFO
* fifo
, fu2::function
<void(int r
, fifo::part_header
&&)> f
,
1989 std::uint64_t tid
, lr::AioCompletion
* super
)
1990 : Completion(dpp
, super
), fifo(fifo
), f(std::move(f
)), tid(tid
) {}
1991 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
1994 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
1995 << " read_meta failed: r="
1996 << r
<< " tid=" << tid
<< dendl
;
1999 complete(std::move(p
), r
);
2003 auto info
= fifo
->meta();
2004 auto hpn
= info
.head_part_num
;
2006 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2007 << " no head, returning empty partinfo r="
2008 << r
<< " tid=" << tid
<< dendl
;
2011 complete(std::move(p
), r
);
2015 auto op
= rgw::cls::fifo::get_part_info(fifo
->cct
, &header
, tid
);
2016 std::unique_lock
l(fifo
->m
);
2017 auto oid
= fifo
->info
.part_oid(hpn
);
2019 r
= fifo
->ioctx
.aio_operate(oid
, call(std::move(p
)), &op
,
2021 ceph_assert(r
>= 0);
2026 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2027 << " get_part_info failed: r="
2028 << r
<< " tid=" << tid
<< dendl
;
2032 f(r
, std::move(header
));
2033 complete(std::move(p
), r
);
2038 void FIFO::get_head_info(const DoutPrefixProvider
*dpp
, fu2::unique_function
<void(int r
,
2039 fifo::part_header
&&)> f
,
2040 lr::AioCompletion
* c
)
2042 std::unique_lock
l(m
);
2043 auto tid
= ++next_tid
;
2045 auto ig
= std::make_unique
<InfoGetter
>(dpp
, this, std::move(f
), tid
, c
);
2046 read_meta(dpp
, tid
, InfoGetter::call(std::move(ig
)));
2049 struct JournalProcessor
: public Completion
<JournalProcessor
> {
2053 std::vector
<fifo::journal_entry
> processed
;
2054 std::multimap
<std::int64_t, fifo::journal_entry
> journal
;
2055 std::multimap
<std::int64_t, fifo::journal_entry
>::iterator iter
;
2056 std::int64_t new_tail
;
2057 std::int64_t new_head
;
2058 std::int64_t new_max
;
2059 int race_retries
= 0;
2060 bool first_pp
= true;
2061 bool canceled
= false;
2069 void create_part(Ptr
&& p
, int64_t part_num
,
2070 std::string_view tag
) {
2071 ldout(fifo
->cct
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2072 << " entering: tid=" << tid
<< dendl
;
2073 state
= entry_callback
;
2074 lr::ObjectWriteOperation op
;
2075 op
.create(false); /* We don't need exclusivity, part_init ensures
2076 we're creating from the same journal entry. */
2077 std::unique_lock
l(fifo
->m
);
2078 part_init(&op
, tag
, fifo
->info
.params
);
2079 auto oid
= fifo
->info
.part_oid(part_num
);
2081 auto r
= fifo
->ioctx
.aio_operate(oid
, call(std::move(p
)), &op
);
2082 ceph_assert(r
>= 0);
2086 void remove_part(Ptr
&& p
, int64_t part_num
,
2087 std::string_view tag
) {
2088 ldout(fifo
->cct
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2089 << " entering: tid=" << tid
<< dendl
;
2090 state
= entry_callback
;
2091 lr::ObjectWriteOperation op
;
2093 std::unique_lock
l(fifo
->m
);
2094 auto oid
= fifo
->info
.part_oid(part_num
);
2096 auto r
= fifo
->ioctx
.aio_operate(oid
, call(std::move(p
)), &op
);
2097 ceph_assert(r
>= 0);
2101 void finish_je(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
,
2102 const fifo::journal_entry
& entry
) {
2103 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2104 << " entering: tid=" << tid
<< dendl
;
2106 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2107 << " finishing entry: entry=" << entry
2108 << " tid=" << tid
<< dendl
;
2110 if (entry
.op
== fifo::journal_entry::Op::remove
&& r
== -ENOENT
)
2114 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2115 << " processing entry failed: entry=" << entry
2116 << " r=" << r
<< " tid=" << tid
<< dendl
;
2117 complete(std::move(p
), r
);
2121 case fifo::journal_entry::Op::unknown
:
2122 case fifo::journal_entry::Op::set_head
:
2123 // Can't happen. Filtered out in process.
2124 complete(std::move(p
), -EIO
);
2127 case fifo::journal_entry::Op::create
:
2128 if (entry
.part_num
> new_max
) {
2129 new_max
= entry
.part_num
;
2132 case fifo::journal_entry::Op::remove
:
2133 if (entry
.part_num
>= new_tail
) {
2134 new_tail
= entry
.part_num
+ 1;
2138 processed
.push_back(entry
);
2141 process(dpp
, std::move(p
));
2144 void postprocess(const DoutPrefixProvider
*dpp
, Ptr
&& p
) {
2145 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2146 << " entering: tid=" << tid
<< dendl
;
2147 if (processed
.empty()) {
2148 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2149 << " nothing to update any more: race_retries="
2150 << race_retries
<< " tid=" << tid
<< dendl
;
2151 complete(std::move(p
), 0);
2154 pp_run(dpp
, std::move(p
), 0, false);
2159 JournalProcessor(const DoutPrefixProvider
*dpp
, FIFO
* fifo
, std::uint64_t tid
, lr::AioCompletion
* super
)
2160 : Completion(dpp
, super
), fifo(fifo
), tid(tid
) {
2161 std::unique_lock
l(fifo
->m
);
2162 journal
= fifo
->info
.journal
;
2163 iter
= journal
.begin();
2164 new_tail
= fifo
->info
.tail_part_num
;
2165 new_head
= fifo
->info
.head_part_num
;
2166 new_max
= fifo
->info
.max_push_part_num
;
2169 void pp_run(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
, bool canceled
) {
2170 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2171 << " entering: tid=" << tid
<< dendl
;
2172 std::optional
<int64_t> tail_part_num
;
2173 std::optional
<int64_t> head_part_num
;
2174 std::optional
<int64_t> max_part_num
;
2177 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2178 << " failed, r=: " << r
<< " tid=" << tid
<< dendl
;
2179 complete(std::move(p
), r
);
2183 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2184 << " postprocessing: race_retries="
2185 << race_retries
<< " tid=" << tid
<< dendl
;
2187 if (!first_pp
&& r
== 0 && !canceled
) {
2188 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2189 << " nothing to update any more: race_retries="
2190 << race_retries
<< " tid=" << tid
<< dendl
;
2191 complete(std::move(p
), 0);
2198 if (race_retries
>= MAX_RACE_RETRIES
) {
2199 ldpp_dout(dpp
, -1) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2200 << " canceled too many times, giving up: tid="
2202 complete(std::move(p
), -ECANCELED
);
2205 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2206 << " update canceled, retrying: race_retries="
2207 << race_retries
<< " tid=" << tid
<< dendl
;
2211 std::vector
<fifo::journal_entry
> new_processed
;
2212 std::unique_lock
l(fifo
->m
);
2213 for (auto& e
: processed
) {
2214 auto jiter
= fifo
->info
.journal
.find(e
.part_num
);
2215 /* journal entry was already processed */
2216 if (jiter
== fifo
->info
.journal
.end() ||
2217 !(jiter
->second
== e
)) {
2220 new_processed
.push_back(e
);
2222 processed
= std::move(new_processed
);
2225 std::unique_lock
l(fifo
->m
);
2226 auto objv
= fifo
->info
.version
;
2227 if (new_tail
> fifo
->info
.tail_part_num
) {
2228 tail_part_num
= new_tail
;
2231 if (new_head
> fifo
->info
.head_part_num
) {
2232 head_part_num
= new_head
;
2235 if (new_max
> fifo
->info
.max_push_part_num
) {
2236 max_part_num
= new_max
;
2240 if (processed
.empty() &&
2243 /* nothing to update anymore */
2244 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2245 << " nothing to update any more: race_retries="
2246 << race_retries
<< " tid=" << tid
<< dendl
;
2247 complete(std::move(p
), 0);
2250 state
= pp_callback
;
2251 fifo
->_update_meta(dpp
, fifo::update
{}
2252 .tail_part_num(tail_part_num
)
2253 .head_part_num(head_part_num
)
2254 .max_push_part_num(max_part_num
)
2255 .journal_entries_rm(processed
),
2256 objv
, &this->canceled
, tid
, call(std::move(p
)));
2260 JournalProcessor(const JournalProcessor
&) = delete;
2261 JournalProcessor
& operator =(const JournalProcessor
&) = delete;
2262 JournalProcessor(JournalProcessor
&&) = delete;
2263 JournalProcessor
& operator =(JournalProcessor
&&) = delete;
2265 void process(const DoutPrefixProvider
*dpp
, Ptr
&& p
) {
2266 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2267 << " entering: tid=" << tid
<< dendl
;
2268 while (iter
!= journal
.end()) {
2269 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2270 << " processing entry: entry=" << *iter
2271 << " tid=" << tid
<< dendl
;
2272 const auto entry
= iter
->second
;
2274 case fifo::journal_entry::Op::create
:
2275 create_part(std::move(p
), entry
.part_num
, entry
.part_tag
);
2277 case fifo::journal_entry::Op::set_head
:
2278 if (entry
.part_num
> new_head
) {
2279 new_head
= entry
.part_num
;
2281 processed
.push_back(entry
);
2284 case fifo::journal_entry::Op::remove
:
2285 remove_part(std::move(p
), entry
.part_num
, entry
.part_tag
);
2288 lderr(fifo
->cct
) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2289 << " unknown journaled op: entry=" << entry
<< " tid="
2291 complete(std::move(p
), -EIO
);
2295 postprocess(dpp
, std::move(p
));
2299 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
2300 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< ":" << __LINE__
2301 << " entering: tid=" << tid
<< dendl
;
2303 case entry_callback
:
2304 finish_je(dpp
, std::move(p
), r
, iter
->second
);
2309 pp_run(dpp
, std::move(p
), r
, c
);
2318 void FIFO::process_journal(const DoutPrefixProvider
*dpp
, std::uint64_t tid
, lr::AioCompletion
* c
) {
2319 auto p
= std::make_unique
<JournalProcessor
>(dpp
, this, tid
, c
);
2320 p
->process(dpp
, std::move(p
));
2323 struct Lister
: Completion
<Lister
> {
2325 std::vector
<list_entry
> result
;
2327 std::int64_t part_num
;
2331 std::vector
<fifo::part_list_entry
> entries
;
2332 bool part_more
= false;
2333 bool part_full
= false;
2334 std::vector
<list_entry
>* entries_out
;
2340 void complete(Ptr
&& p
, int r
) {
2342 if (more_out
) *more_out
= more
;
2343 if (entries_out
) *entries_out
= std::move(result
);
2345 Completion::complete(std::move(p
), r
);
2349 Lister(const DoutPrefixProvider
*dpp
, FIFO
* f
, std::int64_t part_num
, std::uint64_t ofs
, int max_entries
,
2350 std::vector
<list_entry
>* entries_out
, bool* more_out
,
2351 std::uint64_t tid
, lr::AioCompletion
* super
)
2352 : Completion(dpp
, super
), f(f
), part_num(part_num
), ofs(ofs
), max_entries(max_entries
),
2353 entries_out(entries_out
), more_out(more_out
), tid(tid
) {
2354 result
.reserve(max_entries
);
2357 Lister(const Lister
&) = delete;
2358 Lister
& operator =(const Lister
&) = delete;
2359 Lister(Lister
&&) = delete;
2360 Lister
& operator =(Lister
&&) = delete;
2362 void handle(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
2364 handle_read(std::move(p
), r
);
2366 handle_list(dpp
, std::move(p
), r
);
2369 void list(Ptr
&& p
) {
2370 if (max_entries
> 0) {
2375 std::unique_lock
l(f
->m
);
2376 auto part_oid
= f
->info
.part_oid(part_num
);
2380 auto op
= list_part(f
->cct
, {}, ofs
, max_entries
, &r_out
,
2381 &entries
, &part_more
, &part_full
,
2383 f
->ioctx
.aio_operate(part_oid
, call(std::move(p
)), &op
, nullptr);
2385 complete(std::move(p
), 0);
2389 void handle_read(Ptr
&& p
, int r
) {
2391 if (r
>= 0) r
= r_out
;
2395 complete(std::move(p
), r
);
2399 if (part_num
< f
->info
.tail_part_num
) {
2400 /* raced with trim? restart */
2401 max_entries
+= result
.size();
2403 part_num
= f
->info
.tail_part_num
;
2408 /* assuming part was not written yet, so end of data */
2410 complete(std::move(p
), 0);
2414 void handle_list(const DoutPrefixProvider
*dpp
, Ptr
&& p
, int r
) {
2415 if (r
>= 0) r
= r_out
;
2417 std::unique_lock
l(f
->m
);
2418 auto part_oid
= f
->info
.part_oid(part_num
);
2422 f
->read_meta(dpp
, tid
, call(std::move(p
)));
2426 complete(std::move(p
), r
);
2430 more
= part_full
|| part_more
;
2431 for (auto& entry
: entries
) {
2433 e
.data
= std::move(entry
.data
);
2434 e
.marker
= marker
{part_num
, entry
.ofs
}.to_string();
2435 e
.mtime
= entry
.mtime
;
2436 result
.push_back(std::move(e
));
2438 max_entries
-= entries
.size();
2440 if (max_entries
> 0 && part_more
) {
2445 if (!part_full
) { /* head part is not full */
2446 complete(std::move(p
), 0);
2455 void FIFO::list(const DoutPrefixProvider
*dpp
, int max_entries
,
2456 std::optional
<std::string_view
> markstr
,
2457 std::vector
<list_entry
>* out
,
2459 lr::AioCompletion
* c
) {
2460 std::unique_lock
l(m
);
2461 auto tid
= ++next_tid
;
2462 std::int64_t part_num
= info
.tail_part_num
;
2464 std::uint64_t ofs
= 0;
2465 std::optional
<::rgw::cls::fifo::marker
> marker
;
2468 marker
= to_marker(*markstr
);
2470 part_num
= marker
->num
;
2475 auto ls
= std::make_unique
<Lister
>(dpp
, this, part_num
, ofs
, max_entries
, out
,
2477 if (markstr
&& !marker
) {
2479 l
->complete(std::move(ls
), -EINVAL
);
2481 ls
->list(std::move(ls
));