]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/cls_fifo_legacy.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / cls_fifo_legacy.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2020 Red Hat <contact@redhat.com>
7 * Author: Adam C. Emerson
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16#include <cstdint>
17#include <numeric>
18#include <optional>
19#include <string_view>
20
21#undef FMT_HEADER_ONLY
22#define FMT_HEADER_ONLY 1
23#include <fmt/format.h>
24
25#include "include/rados/librados.hpp"
26
27#include "include/buffer.h"
28
29#include "common/async/yield_context.h"
30#include "common/random_string.h"
31
32#include "cls/fifo/cls_fifo_types.h"
33#include "cls/fifo/cls_fifo_ops.h"
34
35#include "cls_fifo_legacy.h"
36
37namespace rgw::cls::fifo {
38static constexpr auto dout_subsys = ceph_subsys_objclass;
39namespace cb = ceph::buffer;
40namespace fifo = rados::cls::fifo;
41
42using ceph::from_error_code;
43
44inline constexpr auto MAX_RACE_RETRIES = 10;
45
46void create_meta(lr::ObjectWriteOperation* op,
47 std::string_view id,
48 std::optional<fifo::objv> objv,
49 std::optional<std::string_view> oid_prefix,
50 bool exclusive,
51 std::uint64_t max_part_size,
52 std::uint64_t max_entry_size)
53{
54 fifo::op::create_meta cm;
55
56 cm.id = id;
57 cm.version = objv;
58 cm.oid_prefix = oid_prefix;
59 cm.max_part_size = max_part_size;
60 cm.max_entry_size = max_entry_size;
61 cm.exclusive = exclusive;
62
63 cb::list in;
64 encode(cm, in);
65 op->exec(fifo::op::CLASS, fifo::op::CREATE_META, in);
66}
67
b3b6e05e 68int get_meta(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
f67539c2
TL
69 std::optional<fifo::objv> objv, fifo::info* info,
70 std::uint32_t* part_header_size,
71 std::uint32_t* part_entry_overhead,
72 uint64_t tid, optional_yield y,
73 bool probe)
74{
75 lr::ObjectReadOperation op;
76 fifo::op::get_meta gm;
77 gm.version = objv;
78 cb::list in;
79 encode(gm, in);
80 cb::list bl;
81
82 op.exec(fifo::op::CLASS, fifo::op::GET_META, in,
83 &bl, nullptr);
b3b6e05e 84 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
f67539c2
TL
85 if (r >= 0) try {
86 fifo::op::get_meta_reply reply;
87 auto iter = bl.cbegin();
88 decode(reply, iter);
89 if (info) *info = std::move(reply.info);
90 if (part_header_size) *part_header_size = reply.part_header_size;
91 if (part_entry_overhead)
92 *part_entry_overhead = reply.part_entry_overhead;
93 } catch (const cb::error& err) {
b3b6e05e 94 ldpp_dout(dpp, -1)
f67539c2
TL
95 << __PRETTY_FUNCTION__ << ":" << __LINE__
96 << " decode failed: " << err.what()
97 << " tid=" << tid << dendl;
98 r = from_error_code(err.code());
99 } else if (!(probe && (r == -ENOENT || r == -ENODATA))) {
b3b6e05e 100 ldpp_dout(dpp, -1)
f67539c2
TL
101 << __PRETTY_FUNCTION__ << ":" << __LINE__
102 << " fifo::op::GET_META failed r=" << r << " tid=" << tid
103 << dendl;
104 }
105 return r;
106};
107
108namespace {
109void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv,
110 const fifo::update& update)
111{
112 fifo::op::update_meta um;
113
114 um.version = objv;
115 um.tail_part_num = update.tail_part_num();
116 um.head_part_num = update.head_part_num();
117 um.min_push_part_num = update.min_push_part_num();
118 um.max_push_part_num = update.max_push_part_num();
119 um.journal_entries_add = std::move(update).journal_entries_add();
120 um.journal_entries_rm = std::move(update).journal_entries_rm();
121
122 cb::list in;
123 encode(um, in);
124 op->exec(fifo::op::CLASS, fifo::op::UPDATE_META, in);
125}
126
127void part_init(lr::ObjectWriteOperation* op, std::string_view tag,
128 fifo::data_params params)
129{
130 fifo::op::init_part ip;
131
132 ip.tag = tag;
133 ip.params = params;
134
135 cb::list in;
136 encode(ip, in);
137 op->exec(fifo::op::CLASS, fifo::op::INIT_PART, in);
138}
139
b3b6e05e 140int push_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
f67539c2
TL
141 std::deque<cb::list> data_bufs, std::uint64_t tid,
142 optional_yield y)
143{
144 lr::ObjectWriteOperation op;
145 fifo::op::push_part pp;
146
147 pp.tag = tag;
148 pp.data_bufs = data_bufs;
149 pp.total_len = 0;
150
151 for (const auto& bl : data_bufs)
152 pp.total_len += bl.length();
153
154 cb::list in;
155 encode(pp, in);
156 auto retval = 0;
157 op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in, nullptr, &retval);
b3b6e05e 158 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y, lr::OPERATION_RETURNVEC);
f67539c2 159 if (r < 0) {
b3b6e05e 160 ldpp_dout(dpp, -1)
f67539c2
TL
161 << __PRETTY_FUNCTION__ << ":" << __LINE__
162 << " fifo::op::PUSH_PART failed r=" << r
163 << " tid=" << tid << dendl;
164 return r;
165 }
166 if (retval < 0) {
b3b6e05e 167 ldpp_dout(dpp, -1)
f67539c2
TL
168 << __PRETTY_FUNCTION__ << ":" << __LINE__
169 << " error handling response retval=" << retval
170 << " tid=" << tid << dendl;
171 }
172 return retval;
173}
174
175void push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
176 std::deque<cb::list> data_bufs, std::uint64_t tid,
177 lr::AioCompletion* c)
178{
179 lr::ObjectWriteOperation op;
180 fifo::op::push_part pp;
181
182 pp.tag = tag;
183 pp.data_bufs = data_bufs;
184 pp.total_len = 0;
185
186 for (const auto& bl : data_bufs)
187 pp.total_len += bl.length();
188
189 cb::list in;
190 encode(pp, in);
191 op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in);
192 auto r = ioctx.aio_operate(oid, c, &op, lr::OPERATION_RETURNVEC);
193 ceph_assert(r >= 0);
194}
195
196void trim_part(lr::ObjectWriteOperation* op,
197 std::optional<std::string_view> tag,
198 std::uint64_t ofs, bool exclusive)
199{
200 fifo::op::trim_part tp;
201
202 tp.tag = tag;
203 tp.ofs = ofs;
204 tp.exclusive = exclusive;
205
206 cb::list in;
207 encode(tp, in);
208 op->exec(fifo::op::CLASS, fifo::op::TRIM_PART, in);
209}
210
b3b6e05e 211int list_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
f67539c2
TL
212 std::optional<std::string_view> tag, std::uint64_t ofs,
213 std::uint64_t max_entries,
214 std::vector<fifo::part_list_entry>* entries,
215 bool* more, bool* full_part, std::string* ptag,
216 std::uint64_t tid, optional_yield y)
217{
218 lr::ObjectReadOperation op;
219 fifo::op::list_part lp;
220
221 lp.tag = tag;
222 lp.ofs = ofs;
223 lp.max_entries = max_entries;
224
225 cb::list in;
226 encode(lp, in);
227 cb::list bl;
228 op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, &bl, nullptr);
b3b6e05e 229 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
f67539c2
TL
230 if (r >= 0) try {
231 fifo::op::list_part_reply reply;
232 auto iter = bl.cbegin();
233 decode(reply, iter);
234 if (entries) *entries = std::move(reply.entries);
235 if (more) *more = reply.more;
236 if (full_part) *full_part = reply.full_part;
237 if (ptag) *ptag = reply.tag;
238 } catch (const cb::error& err) {
b3b6e05e 239 ldpp_dout(dpp, -1)
f67539c2
TL
240 << __PRETTY_FUNCTION__ << ":" << __LINE__
241 << " decode failed: " << err.what()
242 << " tid=" << tid << dendl;
243 r = from_error_code(err.code());
244 } else if (r != -ENOENT) {
b3b6e05e 245 ldpp_dout(dpp, -1)
f67539c2
TL
246 << __PRETTY_FUNCTION__ << ":" << __LINE__
247 << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid
248 << dendl;
249 }
250 return r;
251}
252
253struct list_entry_completion : public lr::ObjectOperationCompletion {
254 CephContext* cct;
255 int* r_out;
256 std::vector<fifo::part_list_entry>* entries;
257 bool* more;
258 bool* full_part;
259 std::string* ptag;
260 std::uint64_t tid;
261
262 list_entry_completion(CephContext* cct, int* r_out, std::vector<fifo::part_list_entry>* entries,
263 bool* more, bool* full_part, std::string* ptag,
264 std::uint64_t tid)
265 : cct(cct), r_out(r_out), entries(entries), more(more),
266 full_part(full_part), ptag(ptag), tid(tid) {}
267 virtual ~list_entry_completion() = default;
268 void handle_completion(int r, bufferlist& bl) override {
269 if (r >= 0) try {
270 fifo::op::list_part_reply reply;
271 auto iter = bl.cbegin();
272 decode(reply, iter);
273 if (entries) *entries = std::move(reply.entries);
274 if (more) *more = reply.more;
275 if (full_part) *full_part = reply.full_part;
276 if (ptag) *ptag = reply.tag;
277 } catch (const cb::error& err) {
278 lderr(cct)
279 << __PRETTY_FUNCTION__ << ":" << __LINE__
280 << " decode failed: " << err.what()
281 << " tid=" << tid << dendl;
282 r = from_error_code(err.code());
283 } else if (r < 0) {
284 lderr(cct)
285 << __PRETTY_FUNCTION__ << ":" << __LINE__
286 << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid
287 << dendl;
288 }
289 if (r_out) *r_out = r;
290 }
291};
292
293lr::ObjectReadOperation list_part(CephContext* cct,
294 std::optional<std::string_view> tag,
295 std::uint64_t ofs,
296 std::uint64_t max_entries,
297 int* r_out,
298 std::vector<fifo::part_list_entry>* entries,
299 bool* more, bool* full_part,
300 std::string* ptag, std::uint64_t tid)
301{
302 lr::ObjectReadOperation op;
303 fifo::op::list_part lp;
304
305 lp.tag = tag;
306 lp.ofs = ofs;
307 lp.max_entries = max_entries;
308
309 cb::list in;
310 encode(lp, in);
311 op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in,
312 new list_entry_completion(cct, r_out, entries, more, full_part,
313 ptag, tid));
314 return op;
315}
316
b3b6e05e 317int get_part_info(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
f67539c2
TL
318 fifo::part_header* header,
319 std::uint64_t tid, optional_yield y)
320{
321 lr::ObjectReadOperation op;
322 fifo::op::get_part_info gpi;
323
324 cb::list in;
325 cb::list bl;
326 encode(gpi, in);
327 op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, &bl, nullptr);
b3b6e05e 328 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
f67539c2
TL
329 if (r >= 0) try {
330 fifo::op::get_part_info_reply reply;
331 auto iter = bl.cbegin();
332 decode(reply, iter);
333 if (header) *header = std::move(reply.header);
334 } catch (const cb::error& err) {
b3b6e05e 335 ldpp_dout(dpp, -1)
f67539c2
TL
336 << __PRETTY_FUNCTION__ << ":" << __LINE__
337 << " decode failed: " << err.what()
338 << " tid=" << tid << dendl;
339 r = from_error_code(err.code());
340 } else {
b3b6e05e 341 ldpp_dout(dpp, -1)
f67539c2
TL
342 << __PRETTY_FUNCTION__ << ":" << __LINE__
343 << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid
344 << dendl;
345 }
346 return r;
347}
348
349struct partinfo_completion : public lr::ObjectOperationCompletion {
350 CephContext* cct;
351 int* rp;
352 fifo::part_header* h;
353 std::uint64_t tid;
354 partinfo_completion(CephContext* cct, int* rp, fifo::part_header* h,
355 std::uint64_t tid) :
356 cct(cct), rp(rp), h(h), tid(tid) {
357 }
358 virtual ~partinfo_completion() = default;
359 void handle_completion(int r, bufferlist& bl) override {
360 if (r >= 0) try {
361 fifo::op::get_part_info_reply reply;
362 auto iter = bl.cbegin();
363 decode(reply, iter);
364 if (h) *h = std::move(reply.header);
365 } catch (const cb::error& err) {
366 r = from_error_code(err.code());
367 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
368 << " decode failed: " << err.what()
369 << " tid=" << tid << dendl;
370 } else {
371 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
372 << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid
373 << dendl;
374 }
375 if (rp) {
376 *rp = r;
377 }
378 }
379};
380
381lr::ObjectReadOperation get_part_info(CephContext* cct,
382 fifo::part_header* header,
383 std::uint64_t tid, int* r = 0)
384{
385 lr::ObjectReadOperation op;
386 fifo::op::get_part_info gpi;
387
388 cb::list in;
389 cb::list bl;
390 encode(gpi, in);
391 op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in,
392 new partinfo_completion(cct, r, header, tid));
393 return op;
394}
395}
396
397std::optional<marker> FIFO::to_marker(std::string_view s)
398{
399 marker m;
400 if (s.empty()) {
401 m.num = info.tail_part_num;
402 m.ofs = 0;
403 return m;
404 }
405
406 auto pos = s.find(':');
407 if (pos == string::npos) {
408 return std::nullopt;
409 }
410
411 auto num = s.substr(0, pos);
412 auto ofs = s.substr(pos + 1);
413
414 auto n = ceph::parse<decltype(m.num)>(num);
415 if (!n) {
416 return std::nullopt;
417 }
418 m.num = *n;
419 auto o = ceph::parse<decltype(m.ofs)>(ofs);
420 if (!o) {
421 return std::nullopt;
422 }
423 m.ofs = *o;
424 return m;
425}
426
427std::string FIFO::generate_tag() const
428{
429 static constexpr auto HEADER_TAG_SIZE = 16;
430 return gen_rand_alphanumeric_plain(static_cast<CephContext*>(ioctx.cct()),
431 HEADER_TAG_SIZE);
432}
433
434
435int FIFO::apply_update(fifo::info* info,
436 const fifo::objv& objv,
437 const fifo::update& update,
438 std::uint64_t tid)
439{
440 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
441 << " entering: tid=" << tid << dendl;
442 std::unique_lock l(m);
443 if (objv != info->version) {
444 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
445 << " version mismatch, canceling: tid=" << tid << dendl;
446 return -ECANCELED;
447 }
448 auto err = info->apply_update(update);
449 if (err) {
450 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
451 << " error applying update: " << *err << " tid=" << tid << dendl;
452 return -ECANCELED;
453 }
454
455 ++info->version.ver;
456
457 return {};
458}
459
b3b6e05e 460int FIFO::_update_meta(const DoutPrefixProvider *dpp, const fifo::update& update,
f67539c2
TL
461 fifo::objv version, bool* pcanceled,
462 std::uint64_t tid, optional_yield y)
463{
b3b6e05e 464 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
465 << " entering: tid=" << tid << dendl;
466 lr::ObjectWriteOperation op;
467 bool canceled = false;
468 update_meta(&op, info.version, update);
b3b6e05e 469 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
f67539c2
TL
470 if (r >= 0 || r == -ECANCELED) {
471 canceled = (r == -ECANCELED);
472 if (!canceled) {
473 r = apply_update(&info, version, update, tid);
474 if (r < 0) canceled = true;
475 }
476 if (canceled) {
b3b6e05e 477 r = read_meta(dpp, tid, y);
f67539c2
TL
478 canceled = r < 0 ? false : true;
479 }
480 }
481 if (pcanceled) *pcanceled = canceled;
482 if (canceled) {
b3b6e05e 483 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
484 << " canceled: tid=" << tid << dendl;
485 }
486 if (r < 0) {
b3b6e05e 487 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
488 << " returning error: r=" << r << " tid=" << tid << dendl;
489 }
490 return r;
491}
492
493struct Updater : public Completion<Updater> {
494 FIFO* fifo;
495 fifo::update update;
496 fifo::objv version;
497 bool reread = false;
498 bool* pcanceled = nullptr;
499 std::uint64_t tid;
b3b6e05e 500 Updater(const DoutPrefixProvider *dpp, FIFO* fifo, lr::AioCompletion* super,
f67539c2
TL
501 const fifo::update& update, fifo::objv version,
502 bool* pcanceled, std::uint64_t tid)
b3b6e05e 503 : Completion(dpp, super), fifo(fifo), update(update), version(version),
f67539c2
TL
504 pcanceled(pcanceled) {}
505
b3b6e05e
TL
506 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
507 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
508 << " entering: tid=" << tid << dendl;
509 if (reread)
510 handle_reread(std::move(p), r);
511 else
b3b6e05e 512 handle_update(dpp, std::move(p), r);
f67539c2
TL
513 }
514
b3b6e05e
TL
515 void handle_update(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
516 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
517 << " handling async update_meta: tid="
518 << tid << dendl;
519 if (r < 0 && r != -ECANCELED) {
b3b6e05e 520 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
521 << " update failed: r=" << r << " tid=" << tid << dendl;
522 complete(std::move(p), r);
523 return;
524 }
525 bool canceled = (r == -ECANCELED);
526 if (!canceled) {
527 int r = fifo->apply_update(&fifo->info, version, update, tid);
528 if (r < 0) {
b3b6e05e 529 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
530 << " update failed, marking canceled: r=" << r
531 << " tid=" << tid << dendl;
532 canceled = true;
533 }
534 }
535 if (canceled) {
536 reread = true;
b3b6e05e 537 fifo->read_meta(dpp, tid, call(std::move(p)));
f67539c2
TL
538 return;
539 }
540 if (pcanceled)
541 *pcanceled = false;
b3b6e05e 542 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
543 << " completing: tid=" << tid << dendl;
544 complete(std::move(p), 0);
545 }
546
547 void handle_reread(Ptr&& p, int r) {
548 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
549 << " handling async read_meta: tid="
550 << tid << dendl;
551 if (r < 0 && pcanceled) {
552 *pcanceled = false;
553 } else if (r >= 0 && pcanceled) {
554 *pcanceled = true;
555 }
556 if (r < 0) {
557 lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
558 << " failed dispatching read_meta: r=" << r << " tid="
559 << tid << dendl;
560 } else {
561 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
562 << " completing: tid=" << tid << dendl;
563 }
564 complete(std::move(p), r);
565 }
566};
567
b3b6e05e 568void FIFO::_update_meta(const DoutPrefixProvider *dpp, const fifo::update& update,
f67539c2
TL
569 fifo::objv version, bool* pcanceled,
570 std::uint64_t tid, lr::AioCompletion* c)
571{
b3b6e05e 572 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
573 << " entering: tid=" << tid << dendl;
574 lr::ObjectWriteOperation op;
575 update_meta(&op, info.version, update);
b3b6e05e 576 auto updater = std::make_unique<Updater>(dpp, this, c, update, version, pcanceled,
f67539c2
TL
577 tid);
578 auto r = ioctx.aio_operate(oid, Updater::call(std::move(updater)), &op);
579 assert(r >= 0);
580}
581
b3b6e05e 582int FIFO::create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::string_view tag, std::uint64_t tid,
f67539c2
TL
583 optional_yield y)
584{
b3b6e05e 585 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
586 << " entering: tid=" << tid << dendl;
587 lr::ObjectWriteOperation op;
588 op.create(false); /* We don't need exclusivity, part_init ensures
589 we're creating from the same journal entry. */
590 std::unique_lock l(m);
591 part_init(&op, tag, info.params);
592 auto oid = info.part_oid(part_num);
593 l.unlock();
b3b6e05e 594 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
f67539c2 595 if (r < 0) {
b3b6e05e 596 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
597 << " part_init failed: r=" << r << " tid="
598 << tid << dendl;
599 }
600 return r;
601}
602
b3b6e05e 603int FIFO::remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::string_view tag, std::uint64_t tid,
f67539c2
TL
604 optional_yield y)
605{
b3b6e05e 606 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
607 << " entering: tid=" << tid << dendl;
608 lr::ObjectWriteOperation op;
609 op.remove();
610 std::unique_lock l(m);
611 auto oid = info.part_oid(part_num);
612 l.unlock();
b3b6e05e 613 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
f67539c2 614 if (r < 0) {
b3b6e05e 615 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
616 << " remove failed: r=" << r << " tid="
617 << tid << dendl;
618 }
619 return r;
620}
621
b3b6e05e 622int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y)
f67539c2 623{
b3b6e05e 624 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
625 << " entering: tid=" << tid << dendl;
626 std::vector<fifo::journal_entry> processed;
627
628 std::unique_lock l(m);
629 auto tmpjournal = info.journal;
630 auto new_tail = info.tail_part_num;
631 auto new_head = info.head_part_num;
632 auto new_max = info.max_push_part_num;
633 l.unlock();
634
635 int r = 0;
636 for (auto& [n, entry] : tmpjournal) {
b3b6e05e 637 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
638 << " processing entry: entry=" << entry << " tid=" << tid
639 << dendl;
640 switch (entry.op) {
641 case fifo::journal_entry::Op::create:
b3b6e05e 642 r = create_part(dpp, entry.part_num, entry.part_tag, tid, y);
f67539c2
TL
643 if (entry.part_num > new_max) {
644 new_max = entry.part_num;
645 }
646 break;
647 case fifo::journal_entry::Op::set_head:
648 r = 0;
649 if (entry.part_num > new_head) {
650 new_head = entry.part_num;
651 }
652 break;
653 case fifo::journal_entry::Op::remove:
b3b6e05e 654 r = remove_part(dpp, entry.part_num, entry.part_tag, tid, y);
f67539c2
TL
655 if (r == -ENOENT) r = 0;
656 if (entry.part_num >= new_tail) {
657 new_tail = entry.part_num + 1;
658 }
659 break;
660 default:
b3b6e05e 661 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
662 << " unknown journaled op: entry=" << entry << " tid="
663 << tid << dendl;
664 return -EIO;
665 }
666
667 if (r < 0) {
b3b6e05e 668 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
669 << " processing entry failed: entry=" << entry
670 << " r=" << r << " tid=" << tid << dendl;
671 return -r;
672 }
673
674 processed.push_back(std::move(entry));
675 }
676
677 // Postprocess
678 bool canceled = true;
679
680 for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
b3b6e05e 681 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
682 << " postprocessing: i=" << i << " tid=" << tid << dendl;
683
684 std::optional<int64_t> tail_part_num;
685 std::optional<int64_t> head_part_num;
686 std::optional<int64_t> max_part_num;
687
688 std::unique_lock l(m);
689 auto objv = info.version;
690 if (new_tail > tail_part_num) tail_part_num = new_tail;
691 if (new_head > info.head_part_num) head_part_num = new_head;
692 if (new_max > info.max_push_part_num) max_part_num = new_max;
693 l.unlock();
694
695 if (processed.empty() &&
696 !tail_part_num &&
697 !max_part_num) {
b3b6e05e 698 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
699 << " nothing to update any more: i=" << i << " tid="
700 << tid << dendl;
701 canceled = false;
702 break;
703 }
704 auto u = fifo::update().tail_part_num(tail_part_num)
705 .head_part_num(head_part_num).max_push_part_num(max_part_num)
706 .journal_entries_rm(processed);
b3b6e05e 707 r = _update_meta(dpp, u, objv, &canceled, tid, y);
f67539c2 708 if (r < 0) {
b3b6e05e 709 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
710 << " _update_meta failed: update=" << u
711 << " r=" << r << " tid=" << tid << dendl;
712 break;
713 }
714
715 if (canceled) {
716 std::vector<fifo::journal_entry> new_processed;
717 std::unique_lock l(m);
b3b6e05e 718 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
719 << " update canceled, retrying: i=" << i << " tid="
720 << tid << dendl;
721 for (auto& e : processed) {
722 auto jiter = info.journal.find(e.part_num);
723 /* journal entry was already processed */
724 if (jiter == info.journal.end() ||
725 !(jiter->second == e)) {
726 continue;
727 }
728 new_processed.push_back(e);
729 }
730 processed = std::move(new_processed);
731 }
732 }
733 if (r == 0 && canceled) {
b3b6e05e 734 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
735 << " canceled too many times, giving up: tid=" << tid << dendl;
736 r = -ECANCELED;
737 }
738 if (r < 0) {
b3b6e05e 739 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
740 << " failed, r=: " << r << " tid=" << tid << dendl;
741 }
742 return r;
743}
744
b3b6e05e 745int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid, optional_yield y)
f67539c2 746{
b3b6e05e 747 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
748 << " entering: tid=" << tid << dendl;
749 std::unique_lock l(m);
750 std::vector jentries = { info.next_journal_entry(generate_tag()) };
751 if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
752 l.unlock();
b3b6e05e 753 ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
754 << " new part journaled, but not processed: tid="
755 << tid << dendl;
b3b6e05e 756 auto r = process_journal(dpp, tid, y);
f67539c2 757 if (r < 0) {
b3b6e05e 758 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
759 << " process_journal failed: r=" << r << " tid=" << tid << dendl;
760 }
761 return r;
762 }
763 std::int64_t new_head_part_num = info.head_part_num;
764 auto version = info.version;
765
766 if (is_head) {
b3b6e05e 767 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
768 << " needs new head: tid=" << tid << dendl;
769 auto new_head_jentry = jentries.front();
770 new_head_jentry.op = fifo::journal_entry::Op::set_head;
771 new_head_part_num = jentries.front().part_num;
772 jentries.push_back(std::move(new_head_jentry));
773 }
774 l.unlock();
775
776 int r = 0;
777 bool canceled = true;
778 for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
779 canceled = false;
b3b6e05e 780 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
781 << " updating metadata: i=" << i << " tid=" << tid << dendl;
782 auto u = fifo::update{}.journal_entries_add(jentries);
b3b6e05e 783 r = _update_meta(dpp, u, version, &canceled, tid, y);
f67539c2
TL
784 if (r >= 0 && canceled) {
785 std::unique_lock l(m);
786 auto found = (info.journal.find(jentries.front().part_num) !=
787 info.journal.end());
788 if ((info.max_push_part_num >= jentries.front().part_num &&
789 info.head_part_num >= new_head_part_num)) {
b3b6e05e 790 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
791 << " raced, but journaled and processed: i=" << i
792 << " tid=" << tid << dendl;
793 return 0;
794 }
795 if (found) {
b3b6e05e 796 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
797 << " raced, journaled but not processed: i=" << i
798 << " tid=" << tid << dendl;
799 canceled = false;
800 }
801 l.unlock();
802 }
803 if (r < 0) {
b3b6e05e 804 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
805 << " _update_meta failed: update=" << u << " r=" << r
806 << " tid=" << tid << dendl;
807 return r;
808 }
809 }
810 if (canceled) {
b3b6e05e 811 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
812 << " canceled too many times, giving up: tid=" << tid << dendl;
813 return -ECANCELED;
814 }
b3b6e05e 815 r = process_journal(dpp, tid, y);
f67539c2 816 if (r < 0) {
b3b6e05e 817 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
818 << " process_journal failed: r=" << r << " tid=" << tid << dendl;
819 }
820 return r;
821}
822
b3b6e05e 823int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y)
f67539c2 824{
b3b6e05e 825 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
826 << " entering: tid=" << tid << dendl;
827 std::unique_lock l(m);
828 std::int64_t new_head_num = info.head_part_num + 1;
829 auto max_push_part_num = info.max_push_part_num;
830 auto version = info.version;
831 l.unlock();
832
833 int r = 0;
834 if (max_push_part_num < new_head_num) {
b3b6e05e 835 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 836 << " need new part: tid=" << tid << dendl;
b3b6e05e 837 r = _prepare_new_part(dpp, true, tid, y);
f67539c2 838 if (r < 0) {
b3b6e05e 839 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
840 << " _prepare_new_part failed: r=" << r
841 << " tid=" << tid << dendl;
842 return r;
843 }
844 std::unique_lock l(m);
845 if (info.max_push_part_num < new_head_num) {
b3b6e05e 846 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
847 << " inconsistency, push part less than head part: "
848 << " tid=" << tid << dendl;
849 return -EIO;
850 }
851 l.unlock();
852 return 0;
853 }
854
855 bool canceled = true;
856 for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
b3b6e05e 857 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
858 << " updating head: i=" << i << " tid=" << tid << dendl;
859 auto u = fifo::update{}.head_part_num(new_head_num);
b3b6e05e 860 r = _update_meta(dpp, u, version, &canceled, tid, y);
f67539c2 861 if (r < 0) {
b3b6e05e 862 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
863 << " _update_meta failed: update=" << u << " r=" << r
864 << " tid=" << tid << dendl;
865 return r;
866 }
867 std::unique_lock l(m);
868 auto head_part_num = info.head_part_num;
869 version = info.version;
870 l.unlock();
871 if (canceled && (head_part_num >= new_head_num)) {
b3b6e05e 872 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
873 << " raced, but completed by the other caller: i=" << i
874 << " tid=" << tid << dendl;
875 canceled = false;
876 }
877 }
878 if (canceled) {
b3b6e05e 879 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
880 << " canceled too many times, giving up: tid=" << tid << dendl;
881 return -ECANCELED;
882 }
883 return 0;
884}
885
886struct NewPartPreparer : public Completion<NewPartPreparer> {
887 FIFO* f;
888 std::vector<fifo::journal_entry> jentries;
889 int i = 0;
890 std::int64_t new_head_part_num;
891 bool canceled = false;
892 uint64_t tid;
893
b3b6e05e 894 NewPartPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
f67539c2
TL
895 std::vector<fifo::journal_entry> jentries,
896 std::int64_t new_head_part_num,
897 std::uint64_t tid)
b3b6e05e 898 : Completion(dpp, super), f(f), jentries(std::move(jentries)),
f67539c2
TL
899 new_head_part_num(new_head_part_num), tid(tid) {}
900
b3b6e05e
TL
901 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
902 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
903 << " entering: tid=" << tid << dendl;
904 if (r < 0) {
b3b6e05e 905 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
906 << " _update_meta failed: r=" << r
907 << " tid=" << tid << dendl;
908 complete(std::move(p), r);
909 return;
910 }
911
912 if (canceled) {
913 std::unique_lock l(f->m);
914 auto iter = f->info.journal.find(jentries.front().part_num);
915 auto max_push_part_num = f->info.max_push_part_num;
916 auto head_part_num = f->info.head_part_num;
917 auto version = f->info.version;
918 auto found = (iter != f->info.journal.end());
919 l.unlock();
920 if ((max_push_part_num >= jentries.front().part_num &&
921 head_part_num >= new_head_part_num)) {
b3b6e05e 922 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
923 << " raced, but journaled and processed: i=" << i
924 << " tid=" << tid << dendl;
925 complete(std::move(p), 0);
926 return;
927 }
928 if (i >= MAX_RACE_RETRIES) {
929 complete(std::move(p), -ECANCELED);
930 return;
931 }
932 if (!found) {
933 ++i;
b3b6e05e 934 f->_update_meta(dpp, fifo::update{}
f67539c2
TL
935 .journal_entries_add(jentries),
936 version, &canceled, tid, call(std::move(p)));
937 return;
938 } else {
b3b6e05e 939 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
940 << " raced, journaled but not processed: i=" << i
941 << " tid=" << tid << dendl;
942 canceled = false;
943 }
944 // Fall through. We still need to process the journal.
945 }
b3b6e05e 946 f->process_journal(dpp, tid, super());
f67539c2
TL
947 return;
948 }
949};
950
b3b6e05e 951void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid,
f67539c2
TL
952 lr::AioCompletion* c)
953{
954 std::unique_lock l(m);
955 std::vector jentries = { info.next_journal_entry(generate_tag()) };
956 if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
957 l.unlock();
b3b6e05e 958 ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
959 << " new part journaled, but not processed: tid="
960 << tid << dendl;
b3b6e05e 961 process_journal(dpp, tid, c);
f67539c2
TL
962 return;
963 }
964 std::int64_t new_head_part_num = info.head_part_num;
965 auto version = info.version;
966
967 if (is_head) {
968 auto new_head_jentry = jentries.front();
969 new_head_jentry.op = fifo::journal_entry::Op::set_head;
970 new_head_part_num = jentries.front().part_num;
971 jentries.push_back(std::move(new_head_jentry));
972 }
973 l.unlock();
974
b3b6e05e 975 auto n = std::make_unique<NewPartPreparer>(dpp, this, c, jentries,
f67539c2
TL
976 new_head_part_num, tid);
977 auto np = n.get();
b3b6e05e 978 _update_meta(dpp, fifo::update{}.journal_entries_add(jentries), version,
f67539c2
TL
979 &np->canceled, tid, NewPartPreparer::call(std::move(n)));
980}
981
982struct NewHeadPreparer : public Completion<NewHeadPreparer> {
983 FIFO* f;
984 int i = 0;
985 bool newpart;
986 std::int64_t new_head_num;
987 bool canceled = false;
988 std::uint64_t tid;
989
b3b6e05e 990 NewHeadPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
f67539c2 991 bool newpart, std::int64_t new_head_num, std::uint64_t tid)
b3b6e05e 992 : Completion(dpp, super), f(f), newpart(newpart), new_head_num(new_head_num),
f67539c2
TL
993 tid(tid) {}
994
b3b6e05e 995 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
996 if (newpart)
997 handle_newpart(std::move(p), r);
998 else
b3b6e05e 999 handle_update(dpp, std::move(p), r);
f67539c2
TL
1000 }
1001
1002 void handle_newpart(Ptr&& p, int r) {
1003 if (r < 0) {
1004 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1005 << " _prepare_new_part failed: r=" << r
1006 << " tid=" << tid << dendl;
1007 complete(std::move(p), r);
1008 return;
1009 }
1010 std::unique_lock l(f->m);
1011 if (f->info.max_push_part_num < new_head_num) {
1012 l.unlock();
1013 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1014 << " _prepare_new_part failed: r=" << r
1015 << " tid=" << tid << dendl;
1016 complete(std::move(p), -EIO);
1017 } else {
1018 l.unlock();
1019 complete(std::move(p), 0);
1020 }
1021 }
1022
b3b6e05e 1023 void handle_update(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
1024 std::unique_lock l(f->m);
1025 auto head_part_num = f->info.head_part_num;
1026 auto version = f->info.version;
1027 l.unlock();
1028
1029 if (r < 0) {
b3b6e05e 1030 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1031 << " _update_meta failed: r=" << r
1032 << " tid=" << tid << dendl;
1033 complete(std::move(p), r);
1034 return;
1035 }
1036 if (canceled) {
1037 if (i >= MAX_RACE_RETRIES) {
b3b6e05e 1038 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1039 << " canceled too many times, giving up: tid=" << tid << dendl;
1040 complete(std::move(p), -ECANCELED);
1041 return;
1042 }
1043
1044 // Raced, but there's still work to do!
1045 if (head_part_num < new_head_num) {
1046 canceled = false;
1047 ++i;
b3b6e05e 1048 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1049 << " updating head: i=" << i << " tid=" << tid << dendl;
b3b6e05e 1050 f->_update_meta(dpp, fifo::update{}.head_part_num(new_head_num),
f67539c2
TL
1051 version, &this->canceled, tid, call(std::move(p)));
1052 return;
1053 }
1054 }
b3b6e05e 1055 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1056 << " succeeded : i=" << i << " tid=" << tid << dendl;
1057 complete(std::move(p), 0);
1058 return;
1059 }
1060};
1061
b3b6e05e 1062void FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c)
f67539c2 1063{
b3b6e05e 1064 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1065 << " entering: tid=" << tid << dendl;
1066 std::unique_lock l(m);
1067 int64_t new_head_num = info.head_part_num + 1;
1068 auto max_push_part_num = info.max_push_part_num;
1069 auto version = info.version;
1070 l.unlock();
1071
1072 if (max_push_part_num < new_head_num) {
b3b6e05e 1073 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1074 << " need new part: tid=" << tid << dendl;
b3b6e05e 1075 auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, true, new_head_num,
f67539c2 1076 tid);
b3b6e05e 1077 _prepare_new_part(dpp, true, tid, NewHeadPreparer::call(std::move(n)));
f67539c2 1078 } else {
b3b6e05e 1079 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1080 << " updating head: tid=" << tid << dendl;
b3b6e05e 1081 auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, false, new_head_num,
f67539c2
TL
1082 tid);
1083 auto np = n.get();
b3b6e05e 1084 _update_meta(dpp, fifo::update{}.head_part_num(new_head_num), version,
f67539c2
TL
1085 &np->canceled, tid, NewHeadPreparer::call(std::move(n)));
1086 }
1087}
1088
b3b6e05e 1089int FIFO::push_entries(const DoutPrefixProvider *dpp, const std::deque<cb::list>& data_bufs,
f67539c2
TL
1090 std::uint64_t tid, optional_yield y)
1091{
b3b6e05e 1092 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1093 << " entering: tid=" << tid << dendl;
1094 std::unique_lock l(m);
1095 auto head_part_num = info.head_part_num;
1096 auto tag = info.head_tag;
1097 const auto part_oid = info.part_oid(head_part_num);
1098 l.unlock();
1099
b3b6e05e 1100 auto r = push_part(dpp, ioctx, part_oid, tag, data_bufs, tid, y);
f67539c2 1101 if (r < 0) {
b3b6e05e 1102 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1103 << " push_part failed: r=" << r << " tid=" << tid << dendl;
1104 }
1105 return r;
1106}
1107
1108void FIFO::push_entries(const std::deque<cb::list>& data_bufs,
1109 std::uint64_t tid, lr::AioCompletion* c)
1110{
1111 std::unique_lock l(m);
1112 auto head_part_num = info.head_part_num;
1113 auto tag = info.head_tag;
1114 const auto part_oid = info.part_oid(head_part_num);
1115 l.unlock();
1116
1117 push_part(ioctx, part_oid, tag, data_bufs, tid, c);
1118}
1119
b3b6e05e 1120int FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
f67539c2
TL
1121 std::optional<std::string_view> tag,
1122 bool exclusive, std::uint64_t tid,
1123 optional_yield y)
1124{
b3b6e05e 1125 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1126 << " entering: tid=" << tid << dendl;
1127 lr::ObjectWriteOperation op;
1128 std::unique_lock l(m);
1129 const auto part_oid = info.part_oid(part_num);
1130 l.unlock();
1131 rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
b3b6e05e 1132 auto r = rgw_rados_operate(dpp, ioctx, part_oid, &op, y);
f67539c2 1133 if (r < 0) {
b3b6e05e 1134 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1135 << " trim_part failed: r=" << r << " tid=" << tid << dendl;
1136 }
1137 return 0;
1138}
1139
1140void FIFO::trim_part(int64_t part_num, uint64_t ofs,
1141 std::optional<std::string_view> tag,
1142 bool exclusive, std::uint64_t tid,
1143 lr::AioCompletion* c)
1144{
1145 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1146 << " entering: tid=" << tid << dendl;
1147 lr::ObjectWriteOperation op;
1148 std::unique_lock l(m);
1149 const auto part_oid = info.part_oid(part_num);
1150 l.unlock();
1151 rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
1152 auto r = ioctx.aio_operate(part_oid, c, &op);
1153 ceph_assert(r >= 0);
1154}
1155
b3b6e05e 1156int FIFO::open(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, std::string oid, std::unique_ptr<FIFO>* fifo,
f67539c2
TL
1157 optional_yield y, std::optional<fifo::objv> objv,
1158 bool probe)
1159{
b3b6e05e 1160 ldpp_dout(dpp, 20)
f67539c2
TL
1161 << __PRETTY_FUNCTION__ << ":" << __LINE__
1162 << " entering" << dendl;
1163 fifo::info info;
1164 std::uint32_t size;
1165 std::uint32_t over;
b3b6e05e 1166 int r = get_meta(dpp, ioctx, std::move(oid), objv, &info, &size, &over, 0, y,
f67539c2
TL
1167 probe);
1168 if (r < 0) {
1169 if (!(probe && (r == -ENOENT || r == -ENODATA))) {
b3b6e05e 1170 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1171 << " get_meta failed: r=" << r << dendl;
1172 }
1173 return r;
1174 }
1175 std::unique_ptr<FIFO> f(new FIFO(std::move(ioctx), oid));
1176 f->info = info;
1177 f->part_header_size = size;
1178 f->part_entry_overhead = over;
1179 // If there are journal entries, process them, in case
1180 // someone crashed mid-transaction.
1181 if (!info.journal.empty()) {
b3b6e05e 1182 ldpp_dout(dpp, 20)
f67539c2
TL
1183 << __PRETTY_FUNCTION__ << ":" << __LINE__
1184 << " processing leftover journal" << dendl;
b3b6e05e 1185 r = f->process_journal(dpp, 0, y);
f67539c2 1186 if (r < 0) {
b3b6e05e 1187 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1188 << " process_journal failed: r=" << r << dendl;
1189 return r;
1190 }
1191 }
1192 *fifo = std::move(f);
1193 return 0;
1194}
1195
b3b6e05e 1196int FIFO::create(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, std::string oid, std::unique_ptr<FIFO>* fifo,
f67539c2
TL
1197 optional_yield y, std::optional<fifo::objv> objv,
1198 std::optional<std::string_view> oid_prefix,
1199 bool exclusive, std::uint64_t max_part_size,
1200 std::uint64_t max_entry_size)
1201{
b3b6e05e 1202 ldpp_dout(dpp, 20)
f67539c2
TL
1203 << __PRETTY_FUNCTION__ << ":" << __LINE__
1204 << " entering" << dendl;
1205 lr::ObjectWriteOperation op;
1206 create_meta(&op, oid, objv, oid_prefix, exclusive, max_part_size,
1207 max_entry_size);
b3b6e05e 1208 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
f67539c2 1209 if (r < 0) {
b3b6e05e 1210 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1211 << " create_meta failed: r=" << r << dendl;
1212 return r;
1213 }
b3b6e05e 1214 r = open(dpp, std::move(ioctx), std::move(oid), fifo, y, objv);
f67539c2
TL
1215 return r;
1216}
1217
b3b6e05e
TL
1218int FIFO::read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y) {
1219 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1220 << " entering: tid=" << tid << dendl;
1221 fifo::info _info;
1222 std::uint32_t _phs;
1223 std::uint32_t _peo;
1224
b3b6e05e 1225 auto r = get_meta(dpp, ioctx, oid, nullopt, &_info, &_phs, &_peo, tid, y);
f67539c2 1226 if (r < 0) {
b3b6e05e 1227 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1228 << " get_meta failed: r=" << r << " tid=" << tid << dendl;
1229 return r;
1230 }
1231 std::unique_lock l(m);
1232 // We have a newer version already!
1233 if (_info.version.same_or_later(this->info.version)) {
1234 info = std::move(_info);
1235 part_header_size = _phs;
1236 part_entry_overhead = _peo;
1237 }
1238 return 0;
1239}
1240
b3b6e05e 1241int FIFO::read_meta(const DoutPrefixProvider *dpp, optional_yield y) {
f67539c2
TL
1242 std::unique_lock l(m);
1243 auto tid = ++next_tid;
1244 l.unlock();
b3b6e05e 1245 return read_meta(dpp, tid, y);
f67539c2
TL
1246}
1247
1248struct Reader : public Completion<Reader> {
1249 FIFO* fifo;
1250 cb::list bl;
1251 std::uint64_t tid;
b3b6e05e
TL
1252 Reader(const DoutPrefixProvider *dpp, FIFO* fifo, lr::AioCompletion* super, std::uint64_t tid)
1253 : Completion(dpp, super), fifo(fifo), tid(tid) {}
f67539c2 1254
b3b6e05e
TL
1255 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
1256 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1257 << " entering: tid=" << tid << dendl;
1258 if (r >= 0) try {
1259 fifo::op::get_meta_reply reply;
1260 auto iter = bl.cbegin();
1261 decode(reply, iter);
1262 std::unique_lock l(fifo->m);
1263 if (reply.info.version.same_or_later(fifo->info.version)) {
1264 fifo->info = std::move(reply.info);
1265 fifo->part_header_size = reply.part_header_size;
1266 fifo->part_entry_overhead = reply.part_entry_overhead;
1267 }
1268 } catch (const cb::error& err) {
b3b6e05e 1269 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1270 << " failed to decode response err=" << err.what()
1271 << " tid=" << tid << dendl;
1272 r = from_error_code(err.code());
1273 } else {
b3b6e05e 1274 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1275 << " read_meta failed r=" << r
1276 << " tid=" << tid << dendl;
1277 }
1278 complete(std::move(p), r);
1279 }
1280};
1281
b3b6e05e 1282void FIFO::read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c)
f67539c2 1283{
b3b6e05e 1284 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1285 << " entering: tid=" << tid << dendl;
1286 lr::ObjectReadOperation op;
1287 fifo::op::get_meta gm;
1288 cb::list in;
1289 encode(gm, in);
b3b6e05e 1290 auto reader = std::make_unique<Reader>(dpp, this, c, tid);
f67539c2
TL
1291 auto rp = reader.get();
1292 auto r = ioctx.aio_exec(oid, Reader::call(std::move(reader)), fifo::op::CLASS,
1293 fifo::op::GET_META, in, &rp->bl);
1294 assert(r >= 0);
1295}
1296
1297const fifo::info& FIFO::meta() const {
1298 return info;
1299}
1300
1301std::pair<std::uint32_t, std::uint32_t> FIFO::get_part_layout_info() const {
1302 return {part_header_size, part_entry_overhead};
1303}
1304
b3b6e05e
TL
1305int FIFO::push(const DoutPrefixProvider *dpp, const cb::list& bl, optional_yield y) {
1306 return push(dpp, std::vector{ bl }, y);
f67539c2
TL
1307}
1308
b3b6e05e
TL
1309void FIFO::push(const DoutPrefixProvider *dpp, const cb::list& bl, lr::AioCompletion* c) {
1310 push(dpp, std::vector{ bl }, c);
f67539c2
TL
1311}
1312
b3b6e05e 1313int FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_bufs, optional_yield y)
f67539c2
TL
1314{
1315 std::unique_lock l(m);
1316 auto tid = ++next_tid;
1317 auto max_entry_size = info.params.max_entry_size;
1318 auto need_new_head = info.need_new_head();
1319 l.unlock();
b3b6e05e 1320 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1321 << " entering: tid=" << tid << dendl;
1322 if (data_bufs.empty()) {
b3b6e05e 1323 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1324 << " empty push, returning success tid=" << tid << dendl;
1325 return 0;
1326 }
1327
1328 // Validate sizes
1329 for (const auto& bl : data_bufs) {
1330 if (bl.length() > max_entry_size) {
b3b6e05e 1331 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1332 << " entry bigger than max_entry_size tid=" << tid << dendl;
1333 return -E2BIG;
1334 }
1335 }
1336
1337 int r = 0;
1338 if (need_new_head) {
b3b6e05e 1339 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1340 << " need new head tid=" << tid << dendl;
b3b6e05e 1341 r = _prepare_new_head(dpp, tid, y);
f67539c2 1342 if (r < 0) {
b3b6e05e 1343 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1344 << " _prepare_new_head failed: r=" << r
1345 << " tid=" << tid << dendl;
1346 return r;
1347 }
1348 }
1349
1350 std::deque<cb::list> remaining(data_bufs.begin(), data_bufs.end());
1351 std::deque<cb::list> batch;
1352
1353 uint64_t batch_len = 0;
1354 auto retries = 0;
1355 bool canceled = true;
1356 while ((!remaining.empty() || !batch.empty()) &&
1357 (retries <= MAX_RACE_RETRIES)) {
b3b6e05e 1358 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1359 << " preparing push: remaining=" << remaining.size()
1360 << " batch=" << batch.size() << " retries=" << retries
1361 << " tid=" << tid << dendl;
1362 std::unique_lock l(m);
1363 auto max_part_size = info.params.max_part_size;
1364 auto overhead = part_entry_overhead;
1365 l.unlock();
1366
1367 while (!remaining.empty() &&
1368 (remaining.front().length() + batch_len <= max_part_size)) {
1369 /* We can send entries with data_len up to max_entry_size,
1370 however, we want to also account the overhead when
1371 dealing with multiple entries. Previous check doesn't
1372 account for overhead on purpose. */
1373 batch_len += remaining.front().length() + overhead;
1374 batch.push_back(std::move(remaining.front()));
1375 remaining.pop_front();
1376 }
b3b6e05e 1377 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1378 << " prepared push: remaining=" << remaining.size()
1379 << " batch=" << batch.size() << " retries=" << retries
1380 << " batch_len=" << batch_len
1381 << " tid=" << tid << dendl;
1382
b3b6e05e 1383 auto r = push_entries(dpp, batch, tid, y);
f67539c2
TL
1384 if (r == -ERANGE) {
1385 canceled = true;
1386 ++retries;
b3b6e05e 1387 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1388 << " need new head tid=" << tid << dendl;
b3b6e05e 1389 r = _prepare_new_head(dpp, tid, y);
f67539c2 1390 if (r < 0) {
b3b6e05e 1391 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1392 << " prepare_new_head failed: r=" << r
1393 << " tid=" << tid << dendl;
1394 return r;
1395 }
1396 r = 0;
1397 continue;
1398 }
1399 if (r < 0) {
b3b6e05e 1400 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1401 << " push_entries failed: r=" << r
1402 << " tid=" << tid << dendl;
1403 return r;
1404 }
1405 // Made forward progress!
1406 canceled = false;
1407 retries = 0;
1408 batch_len = 0;
1409 if (static_cast<unsigned>(r) == batch.size()) {
1410 batch.clear();
1411 } else {
1412 batch.erase(batch.begin(), batch.begin() + r);
1413 for (const auto& b : batch) {
1414 batch_len += b.length() + part_entry_overhead;
1415 }
1416 }
1417 }
1418 if (canceled) {
b3b6e05e 1419 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1420 << " canceled too many times, giving up: tid=" << tid << dendl;
1421 return -ECANCELED;
1422 }
1423 return 0;
1424}
1425
1426struct Pusher : public Completion<Pusher> {
1427 FIFO* f;
1428 std::deque<cb::list> remaining;
1429 std::deque<cb::list> batch;
1430 int i = 0;
1431 std::uint64_t tid;
1432 bool new_heading = false;
1433
1434 void prep_then_push(Ptr&& p, const unsigned successes) {
1435 std::unique_lock l(f->m);
1436 auto max_part_size = f->info.params.max_part_size;
1437 auto part_entry_overhead = f->part_entry_overhead;
1438 l.unlock();
1439
1440 ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1441 << " preparing push: remaining=" << remaining.size()
1442 << " batch=" << batch.size() << " i=" << i
1443 << " tid=" << tid << dendl;
1444
1445 uint64_t batch_len = 0;
1446 if (successes > 0) {
1447 if (successes == batch.size()) {
1448 batch.clear();
1449 } else {
1450 batch.erase(batch.begin(), batch.begin() + successes);
1451 for (const auto& b : batch) {
1452 batch_len += b.length() + part_entry_overhead;
1453 }
1454 }
1455 }
1456
1457 if (batch.empty() && remaining.empty()) {
1458 complete(std::move(p), 0);
1459 return;
1460 }
1461
1462 while (!remaining.empty() &&
1463 (remaining.front().length() + batch_len <= max_part_size)) {
1464
1465 /* We can send entries with data_len up to max_entry_size,
1466 however, we want to also account the overhead when
1467 dealing with multiple entries. Previous check doesn't
1468 account for overhead on purpose. */
1469 batch_len += remaining.front().length() + part_entry_overhead;
1470 batch.push_back(std::move(remaining.front()));
1471 remaining.pop_front();
1472 }
1473 ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1474 << " prepared push: remaining=" << remaining.size()
1475 << " batch=" << batch.size() << " i=" << i
1476 << " batch_len=" << batch_len
1477 << " tid=" << tid << dendl;
1478 push(std::move(p));
1479 }
1480
1481 void push(Ptr&& p) {
1482 f->push_entries(batch, tid, call(std::move(p)));
1483 }
1484
b3b6e05e 1485 void new_head(const DoutPrefixProvider *dpp, Ptr&& p) {
f67539c2 1486 new_heading = true;
b3b6e05e 1487 f->_prepare_new_head(dpp, tid, call(std::move(p)));
f67539c2
TL
1488 }
1489
b3b6e05e 1490 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
1491 if (!new_heading) {
1492 if (r == -ERANGE) {
b3b6e05e 1493 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1494 << " need new head tid=" << tid << dendl;
b3b6e05e 1495 new_head(dpp, std::move(p));
f67539c2
TL
1496 return;
1497 }
1498 if (r < 0) {
b3b6e05e 1499 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1500 << " push_entries failed: r=" << r
1501 << " tid=" << tid << dendl;
1502 complete(std::move(p), r);
1503 return;
1504 }
1505 i = 0; // We've made forward progress, so reset the race counter!
1506 prep_then_push(std::move(p), r);
1507 } else {
1508 if (r < 0) {
b3b6e05e 1509 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1510 << " prepare_new_head failed: r=" << r
1511 << " tid=" << tid << dendl;
1512 complete(std::move(p), r);
1513 return;
1514 }
1515 new_heading = false;
1516 handle_new_head(std::move(p), r);
1517 }
1518 }
1519
1520 void handle_new_head(Ptr&& p, int r) {
1521 if (r == -ECANCELED) {
1522 if (p->i == MAX_RACE_RETRIES) {
1523 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1524 << " canceled too many times, giving up: tid=" << tid << dendl;
1525 complete(std::move(p), -ECANCELED);
1526 return;
1527 }
1528 ++p->i;
1529 } else if (r) {
1530 complete(std::move(p), r);
1531 return;
1532 }
1533
1534 if (p->batch.empty()) {
1535 prep_then_push(std::move(p), 0);
1536 return;
1537 } else {
1538 push(std::move(p));
1539 return;
1540 }
1541 }
1542
b3b6e05e 1543 Pusher(const DoutPrefixProvider *dpp, FIFO* f, std::deque<cb::list>&& remaining,
f67539c2 1544 std::uint64_t tid, lr::AioCompletion* super)
b3b6e05e 1545 : Completion(dpp, super), f(f), remaining(std::move(remaining)),
f67539c2
TL
1546 tid(tid) {}
1547};
1548
b3b6e05e 1549void FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_bufs,
f67539c2
TL
1550 lr::AioCompletion* c)
1551{
1552 std::unique_lock l(m);
1553 auto tid = ++next_tid;
1554 auto max_entry_size = info.params.max_entry_size;
1555 auto need_new_head = info.need_new_head();
1556 l.unlock();
b3b6e05e 1557 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1558 << " entering: tid=" << tid << dendl;
b3b6e05e 1559 auto p = std::make_unique<Pusher>(dpp, this, std::deque<cb::list>(data_bufs.begin(), data_bufs.end()),
f67539c2
TL
1560 tid, c);
1561 // Validate sizes
1562 for (const auto& bl : data_bufs) {
1563 if (bl.length() > max_entry_size) {
b3b6e05e 1564 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1565 << " entry bigger than max_entry_size tid=" << tid << dendl;
1566 Pusher::complete(std::move(p), -E2BIG);
1567 return;
1568 }
1569 }
1570
1571 if (data_bufs.empty() ) {
b3b6e05e 1572 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1573 << " empty push, returning success tid=" << tid << dendl;
1574 Pusher::complete(std::move(p), 0);
1575 return;
1576 }
1577
1578 if (need_new_head) {
b3b6e05e 1579 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1580 << " need new head tid=" << tid << dendl;
b3b6e05e 1581 p->new_head(dpp, std::move(p));
f67539c2
TL
1582 } else {
1583 p->prep_then_push(std::move(p), 0);
1584 }
1585}
1586
b3b6e05e 1587int FIFO::list(const DoutPrefixProvider *dpp, int max_entries,
f67539c2
TL
1588 std::optional<std::string_view> markstr,
1589 std::vector<list_entry>* presult, bool* pmore,
1590 optional_yield y)
1591{
1592 std::unique_lock l(m);
1593 auto tid = ++next_tid;
1594 std::int64_t part_num = info.tail_part_num;
1595 l.unlock();
b3b6e05e 1596 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1597 << " entering: tid=" << tid << dendl;
1598 std::uint64_t ofs = 0;
1599 if (markstr) {
1600 auto marker = to_marker(*markstr);
1601 if (!marker) {
b3b6e05e 1602 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1603 << " invalid marker string: " << markstr
1604 << " tid= "<< tid << dendl;
1605 return -EINVAL;
1606 }
1607 part_num = marker->num;
1608 ofs = marker->ofs;
1609 }
1610
1611 std::vector<list_entry> result;
1612 result.reserve(max_entries);
1613 bool more = false;
1614
1615 std::vector<fifo::part_list_entry> entries;
1616 int r = 0;
1617 while (max_entries > 0) {
b3b6e05e 1618 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1619 << " max_entries=" << max_entries << " tid=" << tid << dendl;
1620 bool part_more = false;
1621 bool part_full = false;
1622
1623 std::unique_lock l(m);
1624 auto part_oid = info.part_oid(part_num);
1625 l.unlock();
1626
b3b6e05e 1627 r = list_part(dpp, ioctx, part_oid, {}, ofs, max_entries, &entries,
f67539c2
TL
1628 &part_more, &part_full, nullptr, tid, y);
1629 if (r == -ENOENT) {
b3b6e05e 1630 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1631 << " missing part, rereading metadata"
1632 << " tid= "<< tid << dendl;
b3b6e05e 1633 r = read_meta(dpp, tid, y);
f67539c2 1634 if (r < 0) {
b3b6e05e 1635 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1636 << " read_meta failed: r=" << r
1637 << " tid= "<< tid << dendl;
1638 return r;
1639 }
1640 if (part_num < info.tail_part_num) {
1641 /* raced with trim? restart */
b3b6e05e 1642 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1643 << " raced with trim, restarting: tid=" << tid << dendl;
1644 max_entries += result.size();
1645 result.clear();
1646 std::unique_lock l(m);
1647 part_num = info.tail_part_num;
1648 l.unlock();
1649 ofs = 0;
1650 continue;
1651 }
b3b6e05e 1652 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1653 << " assuming part was not written yet, so end of data: "
1654 << "tid=" << tid << dendl;
1655 more = false;
1656 r = 0;
1657 break;
1658 }
1659 if (r < 0) {
b3b6e05e 1660 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1661 << " list_entries failed: r=" << r
1662 << " tid= "<< tid << dendl;
1663 return r;
1664 }
1665 more = part_full || part_more;
1666 for (auto& entry : entries) {
1667 list_entry e;
1668 e.data = std::move(entry.data);
1669 e.marker = marker{part_num, entry.ofs}.to_string();
1670 e.mtime = entry.mtime;
1671 result.push_back(std::move(e));
1672 --max_entries;
1673 if (max_entries == 0)
1674 break;
1675 }
1676 entries.clear();
1677 if (max_entries > 0 &&
1678 part_more) {
1679 }
1680
1681 if (!part_full) {
b3b6e05e 1682 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1683 << " head part is not full, so we can assume we're done: "
1684 << "tid=" << tid << dendl;
1685 break;
1686 }
1687 if (!part_more) {
1688 ++part_num;
1689 ofs = 0;
1690 }
1691 }
1692 if (presult)
1693 *presult = std::move(result);
1694 if (pmore)
1695 *pmore = more;
1696 return 0;
1697}
1698
b3b6e05e 1699int FIFO::trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool exclusive, optional_yield y)
f67539c2
TL
1700{
1701 bool overshoot = false;
1702 auto marker = to_marker(markstr);
1703 if (!marker) {
1704 return -EINVAL;
1705 }
1706 auto part_num = marker->num;
1707 auto ofs = marker->ofs;
1708 std::unique_lock l(m);
1709 auto tid = ++next_tid;
1710 auto hn = info.head_part_num;
1711 const auto max_part_size = info.params.max_part_size;
1712 if (part_num > hn) {
1713 l.unlock();
b3b6e05e 1714 auto r = read_meta(dpp, tid, y);
f67539c2
TL
1715 if (r < 0) {
1716 return r;
1717 }
1718 l.lock();
1719 auto hn = info.head_part_num;
1720 if (part_num > hn) {
1721 overshoot = true;
1722 part_num = hn;
1723 ofs = max_part_size;
1724 }
1725 }
1726 if (part_num < info.tail_part_num) {
1727 return -ENODATA;
1728 }
1729 auto pn = info.tail_part_num;
1730 l.unlock();
b3b6e05e 1731 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1732 << " entering: tid=" << tid << dendl;
1733
1734 int r = 0;
1735 while (pn < part_num) {
b3b6e05e 1736 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1737 << " pn=" << pn << " tid=" << tid << dendl;
1738 std::unique_lock l(m);
1739 l.unlock();
b3b6e05e 1740 r = trim_part(dpp, pn, max_part_size, std::nullopt, false, tid, y);
f67539c2 1741 if (r < 0 && r == -ENOENT) {
b3b6e05e 1742 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1743 << " trim_part failed: r=" << r
1744 << " tid= "<< tid << dendl;
1745 return r;
1746 }
1747 ++pn;
1748 }
b3b6e05e 1749 r = trim_part(dpp, part_num, ofs, std::nullopt, exclusive, tid, y);
f67539c2 1750 if (r < 0 && r != -ENOENT) {
b3b6e05e 1751 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1752 << " trim_part failed: r=" << r
1753 << " tid= "<< tid << dendl;
1754 return r;
1755 }
1756
1757 l.lock();
1758 auto tail_part_num = info.tail_part_num;
1759 auto objv = info.version;
1760 l.unlock();
1761 bool canceled = tail_part_num < part_num;
1762 int retries = 0;
1763 while ((tail_part_num < part_num) &&
1764 canceled &&
1765 (retries <= MAX_RACE_RETRIES)) {
b3b6e05e 1766 r = _update_meta(dpp, fifo::update{}.tail_part_num(part_num), objv, &canceled,
f67539c2
TL
1767 tid, y);
1768 if (r < 0) {
b3b6e05e 1769 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1770 << " _update_meta failed: r=" << r
1771 << " tid= "<< tid << dendl;
1772 return r;
1773 }
1774 if (canceled) {
b3b6e05e 1775 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1776 << " canceled: retries=" << retries
1777 << " tid=" << tid << dendl;
1778 l.lock();
1779 tail_part_num = info.tail_part_num;
1780 objv = info.version;
1781 l.unlock();
1782 ++retries;
1783 }
1784 }
1785 if (canceled) {
b3b6e05e 1786 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1787 << " canceled too many times, giving up: tid=" << tid << dendl;
1788 return -EIO;
1789 }
1790 return overshoot ? -ENODATA : 0;
1791}
1792
1793struct Trimmer : public Completion<Trimmer> {
1794 FIFO* fifo;
1795 std::int64_t part_num;
1796 std::uint64_t ofs;
1797 std::int64_t pn;
1798 bool exclusive;
1799 std::uint64_t tid;
1800 bool update = false;
1801 bool reread = false;
1802 bool canceled = false;
1803 bool overshoot = false;
1804 int retries = 0;
1805
b3b6e05e 1806 Trimmer(const DoutPrefixProvider *dpp, FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn,
f67539c2 1807 bool exclusive, lr::AioCompletion* super, std::uint64_t tid)
b3b6e05e 1808 : Completion(dpp, super), fifo(fifo), part_num(part_num), ofs(ofs), pn(pn),
f67539c2
TL
1809 exclusive(exclusive), tid(tid) {}
1810
b3b6e05e
TL
1811 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
1812 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1813 << " entering: tid=" << tid << dendl;
1814
1815 if (reread) {
1816 reread = false;
1817 if (r < 0) {
b3b6e05e 1818 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1819 << " read_meta failed: r="
1820 << r << " tid=" << tid << dendl;
1821 complete(std::move(p), r);
1822 return;
1823 }
1824 std::unique_lock l(fifo->m);
1825 auto hn = fifo->info.head_part_num;
1826 const auto max_part_size = fifo->info.params.max_part_size;
1827 const auto tail_part_num = fifo->info.tail_part_num;
1828 l.unlock();
1829 if (part_num > hn) {
1830 part_num = hn;
1831 ofs = max_part_size;
1832 overshoot = true;
1833 }
1834 if (part_num < tail_part_num) {
1835 complete(std::move(p), -ENODATA);
1836 return;
1837 }
1838 pn = tail_part_num;
1839 if (pn < part_num) {
b3b6e05e 1840 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1841 << " pn=" << pn << " tid=" << tid << dendl;
1842 fifo->trim_part(pn++, max_part_size, std::nullopt,
1843 false, tid, call(std::move(p)));
1844 } else {
1845 update = true;
1846 canceled = tail_part_num < part_num;
1847 fifo->trim_part(part_num, ofs, std::nullopt, exclusive, tid,
1848 call(std::move(p)));
1849 }
1850 return;
1851 }
1852
1853 if (r == -ENOENT) {
1854 r = 0;
1855 }
1856
1857 if (r < 0) {
b3b6e05e 1858 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1859 << (update ? " update_meta " : " trim ") << "failed: r="
1860 << r << " tid=" << tid << dendl;
1861 complete(std::move(p), r);
1862 return;
1863 }
1864
1865 if (!update) {
b3b6e05e 1866 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1867 << " handling preceding trim callback: tid=" << tid << dendl;
1868 retries = 0;
1869 if (pn < part_num) {
b3b6e05e 1870 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1871 << " pn=" << pn << " tid=" << tid << dendl;
1872 std::unique_lock l(fifo->m);
1873 const auto max_part_size = fifo->info.params.max_part_size;
1874 l.unlock();
1875 fifo->trim_part(pn++, max_part_size, std::nullopt,
1876 false, tid, call(std::move(p)));
1877 return;
1878 }
1879
1880 std::unique_lock l(fifo->m);
1881 const auto tail_part_num = fifo->info.tail_part_num;
1882 l.unlock();
1883 update = true;
1884 canceled = tail_part_num < part_num;
1885 fifo->trim_part(part_num, ofs, std::nullopt, exclusive, tid,
1886 call(std::move(p)));
1887 return;
1888 }
1889
b3b6e05e 1890 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1891 << " handling update-needed callback: tid=" << tid << dendl;
1892 std::unique_lock l(fifo->m);
1893 auto tail_part_num = fifo->info.tail_part_num;
1894 auto objv = fifo->info.version;
1895 l.unlock();
1896 if ((tail_part_num < part_num) &&
1897 canceled) {
1898 if (retries > MAX_RACE_RETRIES) {
b3b6e05e 1899 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1900 << " canceled too many times, giving up: tid=" << tid << dendl;
1901 complete(std::move(p), -EIO);
1902 return;
1903 }
1904 ++retries;
b3b6e05e 1905 fifo->_update_meta(dpp, fifo::update{}
f67539c2
TL
1906 .tail_part_num(part_num), objv, &canceled,
1907 tid, call(std::move(p)));
1908 } else {
1909 complete(std::move(p), overshoot ? -ENODATA : 0);
1910 }
1911 }
1912};
1913
b3b6e05e 1914void FIFO::trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool exclusive,
f67539c2
TL
1915 lr::AioCompletion* c) {
1916 auto marker = to_marker(markstr);
1917 auto realmark = marker.value_or(::rgw::cls::fifo::marker{});
1918 std::unique_lock l(m);
1919 const auto hn = info.head_part_num;
1920 const auto max_part_size = info.params.max_part_size;
1921 const auto pn = info.tail_part_num;
1922 const auto part_oid = info.part_oid(pn);
1923 auto tid = ++next_tid;
1924 l.unlock();
b3b6e05e 1925 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1926 << " entering: tid=" << tid << dendl;
b3b6e05e 1927 auto trimmer = std::make_unique<Trimmer>(dpp, this, realmark.num, realmark.ofs,
f67539c2
TL
1928 pn, exclusive, c, tid);
1929 if (!marker) {
1930 Trimmer::complete(std::move(trimmer), -EINVAL);
1931 return;
1932 }
1933 ++trimmer->pn;
1934 auto ofs = marker->ofs;
1935 if (marker->num > hn) {
1936 trimmer->reread = true;
b3b6e05e 1937 read_meta(dpp, tid, Trimmer::call(std::move(trimmer)));
f67539c2
TL
1938 return;
1939 }
1940 if (pn < marker->num) {
b3b6e05e 1941 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1942 << " pn=" << pn << " tid=" << tid << dendl;
1943 ofs = max_part_size;
1944 } else {
1945 trimmer->update = true;
1946 }
1947 trim_part(pn, ofs, std::nullopt, exclusive,
1948 tid, Trimmer::call(std::move(trimmer)));
1949}
1950
b3b6e05e 1951int FIFO::get_part_info(const DoutPrefixProvider *dpp, int64_t part_num,
f67539c2
TL
1952 fifo::part_header* header,
1953 optional_yield y)
1954{
1955 std::unique_lock l(m);
1956 const auto part_oid = info.part_oid(part_num);
1957 auto tid = ++next_tid;
1958 l.unlock();
b3b6e05e 1959 auto r = rgw::cls::fifo::get_part_info(dpp, ioctx, part_oid, header, tid, y);
f67539c2 1960 if (r < 0) {
b3b6e05e 1961 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1962 << " get_part_info failed: r="
1963 << r << " tid=" << tid << dendl;
1964 }
1965 return r;
1966}
1967
1968void FIFO::get_part_info(int64_t part_num,
1969 fifo::part_header* header,
1970 lr::AioCompletion* c)
1971{
1972 std::unique_lock l(m);
1973 const auto part_oid = info.part_oid(part_num);
1974 auto tid = ++next_tid;
1975 l.unlock();
1976 auto op = rgw::cls::fifo::get_part_info(cct, header, tid);
1977 auto r = ioctx.aio_operate(part_oid, c, &op, nullptr);
1978 ceph_assert(r >= 0);
1979}
1980
1981struct InfoGetter : Completion<InfoGetter> {
1982 FIFO* fifo;
1983 fifo::part_header header;
1984 fu2::function<void(int r, fifo::part_header&&)> f;
1985 std::uint64_t tid;
1986 bool headerread = false;
1987
b3b6e05e 1988 InfoGetter(const DoutPrefixProvider *dpp, FIFO* fifo, fu2::function<void(int r, fifo::part_header&&)> f,
f67539c2 1989 std::uint64_t tid, lr::AioCompletion* super)
b3b6e05e
TL
1990 : Completion(dpp, super), fifo(fifo), f(std::move(f)), tid(tid) {}
1991 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
1992 if (!headerread) {
1993 if (r < 0) {
b3b6e05e 1994 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1995 << " read_meta failed: r="
1996 << r << " tid=" << tid << dendl;
1997 if (f)
1998 f(r, {});
1999 complete(std::move(p), r);
2000 return;
2001 }
2002
2003 auto info = fifo->meta();
2004 auto hpn = info.head_part_num;
2005 if (hpn < 0) {
b3b6e05e 2006 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2007 << " no head, returning empty partinfo r="
2008 << r << " tid=" << tid << dendl;
2009 if (f)
2010 f(0, {});
2011 complete(std::move(p), r);
2012 return;
2013 }
2014 headerread = true;
2015 auto op = rgw::cls::fifo::get_part_info(fifo->cct, &header, tid);
2016 std::unique_lock l(fifo->m);
2017 auto oid = fifo->info.part_oid(hpn);
2018 l.unlock();
2019 r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op,
2020 nullptr);
2021 ceph_assert(r >= 0);
2022 return;
2023 }
2024
2025 if (r < 0) {
b3b6e05e 2026 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2027 << " get_part_info failed: r="
2028 << r << " tid=" << tid << dendl;
2029 }
2030
2031 if (f)
2032 f(r, std::move(header));
2033 complete(std::move(p), r);
2034 return;
2035 }
2036};
2037
b3b6e05e 2038void FIFO::get_head_info(const DoutPrefixProvider *dpp, fu2::unique_function<void(int r,
f67539c2
TL
2039 fifo::part_header&&)> f,
2040 lr::AioCompletion* c)
2041{
2042 std::unique_lock l(m);
2043 auto tid = ++next_tid;
2044 l.unlock();
b3b6e05e
TL
2045 auto ig = std::make_unique<InfoGetter>(dpp, this, std::move(f), tid, c);
2046 read_meta(dpp, tid, InfoGetter::call(std::move(ig)));
f67539c2
TL
2047}
2048
2049struct JournalProcessor : public Completion<JournalProcessor> {
2050private:
2051 FIFO* const fifo;
2052
2053 std::vector<fifo::journal_entry> processed;
2054 std::multimap<std::int64_t, fifo::journal_entry> journal;
2055 std::multimap<std::int64_t, fifo::journal_entry>::iterator iter;
2056 std::int64_t new_tail;
2057 std::int64_t new_head;
2058 std::int64_t new_max;
2059 int race_retries = 0;
2060 bool first_pp = true;
2061 bool canceled = false;
2062 std::uint64_t tid;
2063
2064 enum {
2065 entry_callback,
2066 pp_callback,
2067 } state;
2068
2069 void create_part(Ptr&& p, int64_t part_num,
2070 std::string_view tag) {
2071 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2072 << " entering: tid=" << tid << dendl;
2073 state = entry_callback;
2074 lr::ObjectWriteOperation op;
2075 op.create(false); /* We don't need exclusivity, part_init ensures
2076 we're creating from the same journal entry. */
2077 std::unique_lock l(fifo->m);
2078 part_init(&op, tag, fifo->info.params);
2079 auto oid = fifo->info.part_oid(part_num);
2080 l.unlock();
2081 auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op);
2082 ceph_assert(r >= 0);
2083 return;
2084 }
2085
2086 void remove_part(Ptr&& p, int64_t part_num,
2087 std::string_view tag) {
2088 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2089 << " entering: tid=" << tid << dendl;
2090 state = entry_callback;
2091 lr::ObjectWriteOperation op;
2092 op.remove();
2093 std::unique_lock l(fifo->m);
2094 auto oid = fifo->info.part_oid(part_num);
2095 l.unlock();
2096 auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op);
2097 ceph_assert(r >= 0);
2098 return;
2099 }
2100
b3b6e05e 2101 void finish_je(const DoutPrefixProvider *dpp, Ptr&& p, int r,
f67539c2 2102 const fifo::journal_entry& entry) {
b3b6e05e 2103 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2104 << " entering: tid=" << tid << dendl;
2105
b3b6e05e 2106 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2107 << " finishing entry: entry=" << entry
2108 << " tid=" << tid << dendl;
2109
2110 if (entry.op == fifo::journal_entry::Op::remove && r == -ENOENT)
2111 r = 0;
2112
2113 if (r < 0) {
b3b6e05e 2114 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2115 << " processing entry failed: entry=" << entry
2116 << " r=" << r << " tid=" << tid << dendl;
2117 complete(std::move(p), r);
2118 return;
2119 } else {
2120 switch (entry.op) {
2121 case fifo::journal_entry::Op::unknown:
2122 case fifo::journal_entry::Op::set_head:
2123 // Can't happen. Filtered out in process.
2124 complete(std::move(p), -EIO);
2125 return;
2126
2127 case fifo::journal_entry::Op::create:
2128 if (entry.part_num > new_max) {
2129 new_max = entry.part_num;
2130 }
2131 break;
2132 case fifo::journal_entry::Op::remove:
2133 if (entry.part_num >= new_tail) {
2134 new_tail = entry.part_num + 1;
2135 }
2136 break;
2137 }
2138 processed.push_back(entry);
2139 }
2140 ++iter;
b3b6e05e 2141 process(dpp, std::move(p));
f67539c2
TL
2142 }
2143
b3b6e05e
TL
2144 void postprocess(const DoutPrefixProvider *dpp, Ptr&& p) {
2145 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2146 << " entering: tid=" << tid << dendl;
2147 if (processed.empty()) {
b3b6e05e 2148 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2149 << " nothing to update any more: race_retries="
2150 << race_retries << " tid=" << tid << dendl;
2151 complete(std::move(p), 0);
2152 return;
2153 }
b3b6e05e 2154 pp_run(dpp, std::move(p), 0, false);
f67539c2
TL
2155 }
2156
2157public:
2158
b3b6e05e
TL
2159 JournalProcessor(const DoutPrefixProvider *dpp, FIFO* fifo, std::uint64_t tid, lr::AioCompletion* super)
2160 : Completion(dpp, super), fifo(fifo), tid(tid) {
f67539c2
TL
2161 std::unique_lock l(fifo->m);
2162 journal = fifo->info.journal;
2163 iter = journal.begin();
2164 new_tail = fifo->info.tail_part_num;
2165 new_head = fifo->info.head_part_num;
2166 new_max = fifo->info.max_push_part_num;
2167 }
2168
b3b6e05e
TL
2169 void pp_run(const DoutPrefixProvider *dpp, Ptr&& p, int r, bool canceled) {
2170 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2171 << " entering: tid=" << tid << dendl;
2172 std::optional<int64_t> tail_part_num;
2173 std::optional<int64_t> head_part_num;
2174 std::optional<int64_t> max_part_num;
2175
2176 if (r < 0) {
b3b6e05e 2177 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2178 << " failed, r=: " << r << " tid=" << tid << dendl;
2179 complete(std::move(p), r);
2180 }
2181
2182
b3b6e05e 2183 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2184 << " postprocessing: race_retries="
2185 << race_retries << " tid=" << tid << dendl;
2186
2187 if (!first_pp && r == 0 && !canceled) {
b3b6e05e 2188 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2189 << " nothing to update any more: race_retries="
2190 << race_retries << " tid=" << tid << dendl;
2191 complete(std::move(p), 0);
2192 return;
2193 }
2194
2195 first_pp = false;
2196
2197 if (canceled) {
2198 if (race_retries >= MAX_RACE_RETRIES) {
b3b6e05e 2199 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2200 << " canceled too many times, giving up: tid="
2201 << tid << dendl;
2202 complete(std::move(p), -ECANCELED);
2203 return;
2204 }
b3b6e05e 2205 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2206 << " update canceled, retrying: race_retries="
2207 << race_retries << " tid=" << tid << dendl;
2208
2209 ++race_retries;
2210
2211 std::vector<fifo::journal_entry> new_processed;
2212 std::unique_lock l(fifo->m);
2213 for (auto& e : processed) {
2214 auto jiter = fifo->info.journal.find(e.part_num);
2215 /* journal entry was already processed */
2216 if (jiter == fifo->info.journal.end() ||
2217 !(jiter->second == e)) {
2218 continue;
2219 }
2220 new_processed.push_back(e);
2221 }
2222 processed = std::move(new_processed);
2223 }
2224
2225 std::unique_lock l(fifo->m);
2226 auto objv = fifo->info.version;
2227 if (new_tail > fifo->info.tail_part_num) {
2228 tail_part_num = new_tail;
2229 }
2230
2231 if (new_head > fifo->info.head_part_num) {
2232 head_part_num = new_head;
2233 }
2234
2235 if (new_max > fifo->info.max_push_part_num) {
2236 max_part_num = new_max;
2237 }
2238 l.unlock();
2239
2240 if (processed.empty() &&
2241 !tail_part_num &&
2242 !max_part_num) {
2243 /* nothing to update anymore */
b3b6e05e 2244 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2245 << " nothing to update any more: race_retries="
2246 << race_retries << " tid=" << tid << dendl;
2247 complete(std::move(p), 0);
2248 return;
2249 }
2250 state = pp_callback;
b3b6e05e 2251 fifo->_update_meta(dpp, fifo::update{}
f67539c2
TL
2252 .tail_part_num(tail_part_num)
2253 .head_part_num(head_part_num)
2254 .max_push_part_num(max_part_num)
2255 .journal_entries_rm(processed),
2256 objv, &this->canceled, tid, call(std::move(p)));
2257 return;
2258 }
2259
2260 JournalProcessor(const JournalProcessor&) = delete;
2261 JournalProcessor& operator =(const JournalProcessor&) = delete;
2262 JournalProcessor(JournalProcessor&&) = delete;
2263 JournalProcessor& operator =(JournalProcessor&&) = delete;
2264
b3b6e05e
TL
2265 void process(const DoutPrefixProvider *dpp, Ptr&& p) {
2266 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2267 << " entering: tid=" << tid << dendl;
2268 while (iter != journal.end()) {
b3b6e05e 2269 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2270 << " processing entry: entry=" << *iter
2271 << " tid=" << tid << dendl;
2272 const auto entry = iter->second;
2273 switch (entry.op) {
2274 case fifo::journal_entry::Op::create:
2275 create_part(std::move(p), entry.part_num, entry.part_tag);
2276 return;
2277 case fifo::journal_entry::Op::set_head:
2278 if (entry.part_num > new_head) {
2279 new_head = entry.part_num;
2280 }
2281 processed.push_back(entry);
2282 ++iter;
2283 continue;
2284 case fifo::journal_entry::Op::remove:
2285 remove_part(std::move(p), entry.part_num, entry.part_tag);
2286 return;
2287 default:
2288 lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
2289 << " unknown journaled op: entry=" << entry << " tid="
2290 << tid << dendl;
2291 complete(std::move(p), -EIO);
2292 return;
2293 }
2294 }
b3b6e05e 2295 postprocess(dpp, std::move(p));
f67539c2
TL
2296 return;
2297 }
2298
b3b6e05e
TL
2299 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
2300 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2301 << " entering: tid=" << tid << dendl;
2302 switch (state) {
2303 case entry_callback:
b3b6e05e 2304 finish_je(dpp, std::move(p), r, iter->second);
f67539c2
TL
2305 return;
2306 case pp_callback:
2307 auto c = canceled;
2308 canceled = false;
b3b6e05e 2309 pp_run(dpp, std::move(p), r, c);
f67539c2
TL
2310 return;
2311 }
2312
2313 abort();
2314 }
2315
2316};
2317
b3b6e05e
TL
2318void FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c) {
2319 auto p = std::make_unique<JournalProcessor>(dpp, this, tid, c);
2320 p->process(dpp, std::move(p));
f67539c2
TL
2321}
2322
2323struct Lister : Completion<Lister> {
2324 FIFO* f;
2325 std::vector<list_entry> result;
2326 bool more = false;
2327 std::int64_t part_num;
2328 std::uint64_t ofs;
2329 int max_entries;
2330 int r_out = 0;
2331 std::vector<fifo::part_list_entry> entries;
2332 bool part_more = false;
2333 bool part_full = false;
2334 std::vector<list_entry>* entries_out;
2335 bool* more_out;
2336 std::uint64_t tid;
2337
2338 bool read = false;
2339
2340 void complete(Ptr&& p, int r) {
2341 if (r >= 0) {
2342 if (more_out) *more_out = more;
2343 if (entries_out) *entries_out = std::move(result);
2344 }
2345 Completion::complete(std::move(p), r);
2346 }
2347
2348public:
b3b6e05e 2349 Lister(const DoutPrefixProvider *dpp, FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries,
f67539c2
TL
2350 std::vector<list_entry>* entries_out, bool* more_out,
2351 std::uint64_t tid, lr::AioCompletion* super)
b3b6e05e 2352 : Completion(dpp, super), f(f), part_num(part_num), ofs(ofs), max_entries(max_entries),
f67539c2
TL
2353 entries_out(entries_out), more_out(more_out), tid(tid) {
2354 result.reserve(max_entries);
2355 }
2356
2357 Lister(const Lister&) = delete;
2358 Lister& operator =(const Lister&) = delete;
2359 Lister(Lister&&) = delete;
2360 Lister& operator =(Lister&&) = delete;
2361
b3b6e05e 2362 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
2363 if (read)
2364 handle_read(std::move(p), r);
2365 else
b3b6e05e 2366 handle_list(dpp, std::move(p), r);
f67539c2
TL
2367 }
2368
2369 void list(Ptr&& p) {
2370 if (max_entries > 0) {
2371 part_more = false;
2372 part_full = false;
2373 entries.clear();
2374
2375 std::unique_lock l(f->m);
2376 auto part_oid = f->info.part_oid(part_num);
2377 l.unlock();
2378
2379 read = false;
2380 auto op = list_part(f->cct, {}, ofs, max_entries, &r_out,
2381 &entries, &part_more, &part_full,
2382 nullptr, tid);
2383 f->ioctx.aio_operate(part_oid, call(std::move(p)), &op, nullptr);
2384 } else {
2385 complete(std::move(p), 0);
2386 }
2387 }
2388
2389 void handle_read(Ptr&& p, int r) {
2390 read = false;
2391 if (r >= 0) r = r_out;
2392 r_out = 0;
2393
2394 if (r < 0) {
2395 complete(std::move(p), r);
2396 return;
2397 }
2398
2399 if (part_num < f->info.tail_part_num) {
2400 /* raced with trim? restart */
2401 max_entries += result.size();
2402 result.clear();
2403 part_num = f->info.tail_part_num;
2404 ofs = 0;
2405 list(std::move(p));
2406 return;
2407 }
2408 /* assuming part was not written yet, so end of data */
2409 more = false;
2410 complete(std::move(p), 0);
2411 return;
2412 }
2413
b3b6e05e 2414 void handle_list(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
2415 if (r >= 0) r = r_out;
2416 r_out = 0;
2417 std::unique_lock l(f->m);
2418 auto part_oid = f->info.part_oid(part_num);
2419 l.unlock();
2420 if (r == -ENOENT) {
2421 read = true;
b3b6e05e 2422 f->read_meta(dpp, tid, call(std::move(p)));
f67539c2
TL
2423 return;
2424 }
2425 if (r < 0) {
2426 complete(std::move(p), r);
2427 return;
2428 }
2429
2430 more = part_full || part_more;
2431 for (auto& entry : entries) {
2432 list_entry e;
2433 e.data = std::move(entry.data);
2434 e.marker = marker{part_num, entry.ofs}.to_string();
2435 e.mtime = entry.mtime;
2436 result.push_back(std::move(e));
2437 }
2438 max_entries -= entries.size();
2439 entries.clear();
2440 if (max_entries > 0 && part_more) {
2441 list(std::move(p));
2442 return;
2443 }
2444
2445 if (!part_full) { /* head part is not full */
2446 complete(std::move(p), 0);
2447 return;
2448 }
2449 ++part_num;
2450 ofs = 0;
2451 list(std::move(p));
2452 }
2453};
2454
b3b6e05e 2455void FIFO::list(const DoutPrefixProvider *dpp, int max_entries,
f67539c2
TL
2456 std::optional<std::string_view> markstr,
2457 std::vector<list_entry>* out,
2458 bool* more,
2459 lr::AioCompletion* c) {
2460 std::unique_lock l(m);
2461 auto tid = ++next_tid;
2462 std::int64_t part_num = info.tail_part_num;
2463 l.unlock();
2464 std::uint64_t ofs = 0;
2465 std::optional<::rgw::cls::fifo::marker> marker;
2466
2467 if (markstr) {
2468 marker = to_marker(*markstr);
2469 if (marker) {
2470 part_num = marker->num;
2471 ofs = marker->ofs;
2472 }
2473 }
2474
b3b6e05e 2475 auto ls = std::make_unique<Lister>(dpp, this, part_num, ofs, max_entries, out,
f67539c2
TL
2476 more, tid, c);
2477 if (markstr && !marker) {
2478 auto l = ls.get();
2479 l->complete(std::move(ls), -EINVAL);
2480 } else {
2481 ls->list(std::move(ls));
2482 }
2483}
2484}