]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/driver/rados/cls_fifo_legacy.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / rgw / driver / rados / cls_fifo_legacy.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2020 Red Hat <contact@redhat.com>
7 * Author: Adam C. Emerson
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
39ae355f 16#include <algorithm>
f67539c2
TL
17#include <cstdint>
18#include <numeric>
19#include <optional>
20#include <string_view>
21
f67539c2
TL
22#include <fmt/format.h>
23
24#include "include/rados/librados.hpp"
25
26#include "include/buffer.h"
27
28#include "common/async/yield_context.h"
29#include "common/random_string.h"
30
31#include "cls/fifo/cls_fifo_types.h"
32#include "cls/fifo/cls_fifo_ops.h"
33
34#include "cls_fifo_legacy.h"
35
36namespace rgw::cls::fifo {
f67539c2
TL
37namespace cb = ceph::buffer;
38namespace fifo = rados::cls::fifo;
39
40using ceph::from_error_code;
41
42inline constexpr auto MAX_RACE_RETRIES = 10;
43
44void create_meta(lr::ObjectWriteOperation* op,
45 std::string_view id,
46 std::optional<fifo::objv> objv,
47 std::optional<std::string_view> oid_prefix,
48 bool exclusive,
49 std::uint64_t max_part_size,
50 std::uint64_t max_entry_size)
51{
52 fifo::op::create_meta cm;
53
54 cm.id = id;
55 cm.version = objv;
56 cm.oid_prefix = oid_prefix;
57 cm.max_part_size = max_part_size;
58 cm.max_entry_size = max_entry_size;
59 cm.exclusive = exclusive;
60
61 cb::list in;
62 encode(cm, in);
63 op->exec(fifo::op::CLASS, fifo::op::CREATE_META, in);
64}
65
b3b6e05e 66int get_meta(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
f67539c2
TL
67 std::optional<fifo::objv> objv, fifo::info* info,
68 std::uint32_t* part_header_size,
69 std::uint32_t* part_entry_overhead,
70 uint64_t tid, optional_yield y,
71 bool probe)
72{
73 lr::ObjectReadOperation op;
74 fifo::op::get_meta gm;
75 gm.version = objv;
76 cb::list in;
77 encode(gm, in);
78 cb::list bl;
79
80 op.exec(fifo::op::CLASS, fifo::op::GET_META, in,
81 &bl, nullptr);
b3b6e05e 82 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
f67539c2
TL
83 if (r >= 0) try {
84 fifo::op::get_meta_reply reply;
85 auto iter = bl.cbegin();
86 decode(reply, iter);
87 if (info) *info = std::move(reply.info);
88 if (part_header_size) *part_header_size = reply.part_header_size;
89 if (part_entry_overhead)
90 *part_entry_overhead = reply.part_entry_overhead;
91 } catch (const cb::error& err) {
b3b6e05e 92 ldpp_dout(dpp, -1)
f67539c2
TL
93 << __PRETTY_FUNCTION__ << ":" << __LINE__
94 << " decode failed: " << err.what()
95 << " tid=" << tid << dendl;
96 r = from_error_code(err.code());
97 } else if (!(probe && (r == -ENOENT || r == -ENODATA))) {
b3b6e05e 98 ldpp_dout(dpp, -1)
f67539c2
TL
99 << __PRETTY_FUNCTION__ << ":" << __LINE__
100 << " fifo::op::GET_META failed r=" << r << " tid=" << tid
101 << dendl;
102 }
103 return r;
104};
105
106namespace {
107void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv,
108 const fifo::update& update)
109{
110 fifo::op::update_meta um;
111
112 um.version = objv;
113 um.tail_part_num = update.tail_part_num();
114 um.head_part_num = update.head_part_num();
115 um.min_push_part_num = update.min_push_part_num();
116 um.max_push_part_num = update.max_push_part_num();
117 um.journal_entries_add = std::move(update).journal_entries_add();
118 um.journal_entries_rm = std::move(update).journal_entries_rm();
119
120 cb::list in;
121 encode(um, in);
122 op->exec(fifo::op::CLASS, fifo::op::UPDATE_META, in);
123}
124
39ae355f 125void part_init(lr::ObjectWriteOperation* op, fifo::data_params params)
f67539c2
TL
126{
127 fifo::op::init_part ip;
128
f67539c2
TL
129 ip.params = params;
130
131 cb::list in;
132 encode(ip, in);
133 op->exec(fifo::op::CLASS, fifo::op::INIT_PART, in);
134}
135
39ae355f 136int push_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
f67539c2
TL
137 std::deque<cb::list> data_bufs, std::uint64_t tid,
138 optional_yield y)
139{
140 lr::ObjectWriteOperation op;
141 fifo::op::push_part pp;
142
39ae355f
TL
143 op.assert_exists();
144
f67539c2
TL
145 pp.data_bufs = data_bufs;
146 pp.total_len = 0;
147
148 for (const auto& bl : data_bufs)
149 pp.total_len += bl.length();
150
151 cb::list in;
152 encode(pp, in);
153 auto retval = 0;
154 op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in, nullptr, &retval);
b3b6e05e 155 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y, lr::OPERATION_RETURNVEC);
f67539c2 156 if (r < 0) {
b3b6e05e 157 ldpp_dout(dpp, -1)
f67539c2
TL
158 << __PRETTY_FUNCTION__ << ":" << __LINE__
159 << " fifo::op::PUSH_PART failed r=" << r
160 << " tid=" << tid << dendl;
161 return r;
162 }
163 if (retval < 0) {
b3b6e05e 164 ldpp_dout(dpp, -1)
f67539c2
TL
165 << __PRETTY_FUNCTION__ << ":" << __LINE__
166 << " error handling response retval=" << retval
167 << " tid=" << tid << dendl;
168 }
169 return retval;
170}
171
39ae355f 172void push_part(lr::IoCtx& ioctx, const std::string& oid,
f67539c2
TL
173 std::deque<cb::list> data_bufs, std::uint64_t tid,
174 lr::AioCompletion* c)
175{
176 lr::ObjectWriteOperation op;
177 fifo::op::push_part pp;
178
f67539c2
TL
179 pp.data_bufs = data_bufs;
180 pp.total_len = 0;
181
182 for (const auto& bl : data_bufs)
183 pp.total_len += bl.length();
184
185 cb::list in;
186 encode(pp, in);
187 op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in);
188 auto r = ioctx.aio_operate(oid, c, &op, lr::OPERATION_RETURNVEC);
189 ceph_assert(r >= 0);
190}
191
192void trim_part(lr::ObjectWriteOperation* op,
f67539c2
TL
193 std::uint64_t ofs, bool exclusive)
194{
195 fifo::op::trim_part tp;
196
f67539c2
TL
197 tp.ofs = ofs;
198 tp.exclusive = exclusive;
199
200 cb::list in;
201 encode(tp, in);
202 op->exec(fifo::op::CLASS, fifo::op::TRIM_PART, in);
203}
204
b3b6e05e 205int list_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
39ae355f 206 std::uint64_t ofs, std::uint64_t max_entries,
f67539c2 207 std::vector<fifo::part_list_entry>* entries,
39ae355f 208 bool* more, bool* full_part,
f67539c2
TL
209 std::uint64_t tid, optional_yield y)
210{
211 lr::ObjectReadOperation op;
212 fifo::op::list_part lp;
213
f67539c2
TL
214 lp.ofs = ofs;
215 lp.max_entries = max_entries;
216
217 cb::list in;
218 encode(lp, in);
219 cb::list bl;
220 op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, &bl, nullptr);
b3b6e05e 221 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
f67539c2
TL
222 if (r >= 0) try {
223 fifo::op::list_part_reply reply;
224 auto iter = bl.cbegin();
225 decode(reply, iter);
226 if (entries) *entries = std::move(reply.entries);
227 if (more) *more = reply.more;
228 if (full_part) *full_part = reply.full_part;
f67539c2 229 } catch (const cb::error& err) {
b3b6e05e 230 ldpp_dout(dpp, -1)
f67539c2
TL
231 << __PRETTY_FUNCTION__ << ":" << __LINE__
232 << " decode failed: " << err.what()
233 << " tid=" << tid << dendl;
234 r = from_error_code(err.code());
235 } else if (r != -ENOENT) {
b3b6e05e 236 ldpp_dout(dpp, -1)
f67539c2
TL
237 << __PRETTY_FUNCTION__ << ":" << __LINE__
238 << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid
239 << dendl;
240 }
241 return r;
242}
243
244struct list_entry_completion : public lr::ObjectOperationCompletion {
245 CephContext* cct;
246 int* r_out;
247 std::vector<fifo::part_list_entry>* entries;
248 bool* more;
249 bool* full_part;
f67539c2
TL
250 std::uint64_t tid;
251
252 list_entry_completion(CephContext* cct, int* r_out, std::vector<fifo::part_list_entry>* entries,
39ae355f 253 bool* more, bool* full_part, std::uint64_t tid)
f67539c2 254 : cct(cct), r_out(r_out), entries(entries), more(more),
39ae355f 255 full_part(full_part), tid(tid) {}
f67539c2
TL
256 virtual ~list_entry_completion() = default;
257 void handle_completion(int r, bufferlist& bl) override {
258 if (r >= 0) try {
259 fifo::op::list_part_reply reply;
260 auto iter = bl.cbegin();
261 decode(reply, iter);
262 if (entries) *entries = std::move(reply.entries);
263 if (more) *more = reply.more;
264 if (full_part) *full_part = reply.full_part;
f67539c2
TL
265 } catch (const cb::error& err) {
266 lderr(cct)
267 << __PRETTY_FUNCTION__ << ":" << __LINE__
268 << " decode failed: " << err.what()
269 << " tid=" << tid << dendl;
270 r = from_error_code(err.code());
271 } else if (r < 0) {
272 lderr(cct)
273 << __PRETTY_FUNCTION__ << ":" << __LINE__
274 << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid
275 << dendl;
276 }
277 if (r_out) *r_out = r;
278 }
279};
280
281lr::ObjectReadOperation list_part(CephContext* cct,
f67539c2
TL
282 std::uint64_t ofs,
283 std::uint64_t max_entries,
284 int* r_out,
285 std::vector<fifo::part_list_entry>* entries,
286 bool* more, bool* full_part,
39ae355f 287 std::uint64_t tid)
f67539c2
TL
288{
289 lr::ObjectReadOperation op;
290 fifo::op::list_part lp;
291
f67539c2
TL
292 lp.ofs = ofs;
293 lp.max_entries = max_entries;
294
295 cb::list in;
296 encode(lp, in);
297 op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in,
298 new list_entry_completion(cct, r_out, entries, more, full_part,
39ae355f 299 tid));
f67539c2
TL
300 return op;
301}
302
b3b6e05e 303int get_part_info(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
f67539c2
TL
304 fifo::part_header* header,
305 std::uint64_t tid, optional_yield y)
306{
307 lr::ObjectReadOperation op;
308 fifo::op::get_part_info gpi;
309
310 cb::list in;
311 cb::list bl;
312 encode(gpi, in);
313 op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, &bl, nullptr);
b3b6e05e 314 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
f67539c2
TL
315 if (r >= 0) try {
316 fifo::op::get_part_info_reply reply;
317 auto iter = bl.cbegin();
318 decode(reply, iter);
319 if (header) *header = std::move(reply.header);
320 } catch (const cb::error& err) {
b3b6e05e 321 ldpp_dout(dpp, -1)
f67539c2
TL
322 << __PRETTY_FUNCTION__ << ":" << __LINE__
323 << " decode failed: " << err.what()
324 << " tid=" << tid << dendl;
325 r = from_error_code(err.code());
326 } else {
b3b6e05e 327 ldpp_dout(dpp, -1)
f67539c2
TL
328 << __PRETTY_FUNCTION__ << ":" << __LINE__
329 << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid
330 << dendl;
331 }
332 return r;
333}
334
335struct partinfo_completion : public lr::ObjectOperationCompletion {
336 CephContext* cct;
337 int* rp;
338 fifo::part_header* h;
339 std::uint64_t tid;
340 partinfo_completion(CephContext* cct, int* rp, fifo::part_header* h,
341 std::uint64_t tid) :
342 cct(cct), rp(rp), h(h), tid(tid) {
343 }
344 virtual ~partinfo_completion() = default;
345 void handle_completion(int r, bufferlist& bl) override {
346 if (r >= 0) try {
347 fifo::op::get_part_info_reply reply;
348 auto iter = bl.cbegin();
349 decode(reply, iter);
350 if (h) *h = std::move(reply.header);
351 } catch (const cb::error& err) {
352 r = from_error_code(err.code());
353 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
354 << " decode failed: " << err.what()
355 << " tid=" << tid << dendl;
356 } else {
357 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
358 << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid
359 << dendl;
360 }
361 if (rp) {
362 *rp = r;
363 }
364 }
365};
366
367lr::ObjectReadOperation get_part_info(CephContext* cct,
368 fifo::part_header* header,
369 std::uint64_t tid, int* r = 0)
370{
371 lr::ObjectReadOperation op;
372 fifo::op::get_part_info gpi;
373
374 cb::list in;
375 cb::list bl;
376 encode(gpi, in);
377 op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in,
378 new partinfo_completion(cct, r, header, tid));
379 return op;
380}
381}
382
383std::optional<marker> FIFO::to_marker(std::string_view s)
384{
385 marker m;
386 if (s.empty()) {
387 m.num = info.tail_part_num;
388 m.ofs = 0;
389 return m;
390 }
391
392 auto pos = s.find(':');
20effc67 393 if (pos == s.npos) {
f67539c2
TL
394 return std::nullopt;
395 }
396
397 auto num = s.substr(0, pos);
398 auto ofs = s.substr(pos + 1);
399
400 auto n = ceph::parse<decltype(m.num)>(num);
401 if (!n) {
402 return std::nullopt;
403 }
404 m.num = *n;
405 auto o = ceph::parse<decltype(m.ofs)>(ofs);
406 if (!o) {
407 return std::nullopt;
408 }
409 m.ofs = *o;
410 return m;
411}
412
20effc67
TL
413int FIFO::apply_update(const DoutPrefixProvider *dpp,
414 fifo::info* info,
f67539c2
TL
415 const fifo::objv& objv,
416 const fifo::update& update,
417 std::uint64_t tid)
418{
20effc67 419 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
420 << " entering: tid=" << tid << dendl;
421 std::unique_lock l(m);
422 if (objv != info->version) {
20effc67 423 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
39ae355f 424 << " version mismatch, canceling: tid=" << tid << dendl;
f67539c2
TL
425 return -ECANCELED;
426 }
427
39ae355f 428 info->apply_update(update);
f67539c2
TL
429 return {};
430}
431
b3b6e05e 432int FIFO::_update_meta(const DoutPrefixProvider *dpp, const fifo::update& update,
f67539c2
TL
433 fifo::objv version, bool* pcanceled,
434 std::uint64_t tid, optional_yield y)
435{
b3b6e05e 436 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
437 << " entering: tid=" << tid << dendl;
438 lr::ObjectWriteOperation op;
439 bool canceled = false;
39ae355f 440 update_meta(&op, version, update);
b3b6e05e 441 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
f67539c2
TL
442 if (r >= 0 || r == -ECANCELED) {
443 canceled = (r == -ECANCELED);
444 if (!canceled) {
20effc67 445 r = apply_update(dpp, &info, version, update, tid);
f67539c2
TL
446 if (r < 0) canceled = true;
447 }
448 if (canceled) {
b3b6e05e 449 r = read_meta(dpp, tid, y);
f67539c2
TL
450 canceled = r < 0 ? false : true;
451 }
452 }
453 if (pcanceled) *pcanceled = canceled;
454 if (canceled) {
b3b6e05e 455 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
456 << " canceled: tid=" << tid << dendl;
457 }
458 if (r < 0) {
b3b6e05e 459 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
460 << " returning error: r=" << r << " tid=" << tid << dendl;
461 }
462 return r;
463}
464
465struct Updater : public Completion<Updater> {
466 FIFO* fifo;
467 fifo::update update;
468 fifo::objv version;
469 bool reread = false;
470 bool* pcanceled = nullptr;
471 std::uint64_t tid;
b3b6e05e 472 Updater(const DoutPrefixProvider *dpp, FIFO* fifo, lr::AioCompletion* super,
f67539c2
TL
473 const fifo::update& update, fifo::objv version,
474 bool* pcanceled, std::uint64_t tid)
b3b6e05e 475 : Completion(dpp, super), fifo(fifo), update(update), version(version),
f67539c2
TL
476 pcanceled(pcanceled) {}
477
b3b6e05e
TL
478 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
479 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
480 << " entering: tid=" << tid << dendl;
481 if (reread)
20effc67 482 handle_reread(dpp, std::move(p), r);
f67539c2 483 else
b3b6e05e 484 handle_update(dpp, std::move(p), r);
f67539c2
TL
485 }
486
b3b6e05e
TL
487 void handle_update(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
488 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
489 << " handling async update_meta: tid="
490 << tid << dendl;
491 if (r < 0 && r != -ECANCELED) {
b3b6e05e 492 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
493 << " update failed: r=" << r << " tid=" << tid << dendl;
494 complete(std::move(p), r);
495 return;
496 }
497 bool canceled = (r == -ECANCELED);
498 if (!canceled) {
20effc67 499 int r = fifo->apply_update(dpp, &fifo->info, version, update, tid);
f67539c2 500 if (r < 0) {
b3b6e05e 501 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
502 << " update failed, marking canceled: r=" << r
503 << " tid=" << tid << dendl;
504 canceled = true;
505 }
506 }
507 if (canceled) {
508 reread = true;
b3b6e05e 509 fifo->read_meta(dpp, tid, call(std::move(p)));
f67539c2
TL
510 return;
511 }
512 if (pcanceled)
513 *pcanceled = false;
b3b6e05e 514 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
515 << " completing: tid=" << tid << dendl;
516 complete(std::move(p), 0);
517 }
518
20effc67
TL
519 void handle_reread(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
520 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
521 << " handling async read_meta: tid="
522 << tid << dendl;
523 if (r < 0 && pcanceled) {
524 *pcanceled = false;
525 } else if (r >= 0 && pcanceled) {
526 *pcanceled = true;
527 }
528 if (r < 0) {
20effc67 529 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
530 << " failed dispatching read_meta: r=" << r << " tid="
531 << tid << dendl;
532 } else {
20effc67 533 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
534 << " completing: tid=" << tid << dendl;
535 }
536 complete(std::move(p), r);
537 }
538};
539
b3b6e05e 540void FIFO::_update_meta(const DoutPrefixProvider *dpp, const fifo::update& update,
f67539c2
TL
541 fifo::objv version, bool* pcanceled,
542 std::uint64_t tid, lr::AioCompletion* c)
543{
b3b6e05e 544 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
545 << " entering: tid=" << tid << dendl;
546 lr::ObjectWriteOperation op;
547 update_meta(&op, info.version, update);
b3b6e05e 548 auto updater = std::make_unique<Updater>(dpp, this, c, update, version, pcanceled,
f67539c2
TL
549 tid);
550 auto r = ioctx.aio_operate(oid, Updater::call(std::move(updater)), &op);
551 assert(r >= 0);
552}
553
39ae355f 554int FIFO::create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid,
f67539c2
TL
555 optional_yield y)
556{
b3b6e05e 557 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
558 << " entering: tid=" << tid << dendl;
559 lr::ObjectWriteOperation op;
560 op.create(false); /* We don't need exclusivity, part_init ensures
561 we're creating from the same journal entry. */
562 std::unique_lock l(m);
39ae355f 563 part_init(&op, info.params);
f67539c2
TL
564 auto oid = info.part_oid(part_num);
565 l.unlock();
b3b6e05e 566 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
f67539c2 567 if (r < 0) {
b3b6e05e 568 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
569 << " part_init failed: r=" << r << " tid="
570 << tid << dendl;
571 }
572 return r;
573}
574
39ae355f 575int FIFO::remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid,
f67539c2
TL
576 optional_yield y)
577{
b3b6e05e 578 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
579 << " entering: tid=" << tid << dendl;
580 lr::ObjectWriteOperation op;
581 op.remove();
582 std::unique_lock l(m);
583 auto oid = info.part_oid(part_num);
584 l.unlock();
b3b6e05e 585 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
f67539c2 586 if (r < 0) {
b3b6e05e 587 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
588 << " remove failed: r=" << r << " tid="
589 << tid << dendl;
590 }
591 return r;
592}
593
b3b6e05e 594int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y)
f67539c2 595{
b3b6e05e 596 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
597 << " entering: tid=" << tid << dendl;
598 std::vector<fifo::journal_entry> processed;
599
600 std::unique_lock l(m);
601 auto tmpjournal = info.journal;
602 auto new_tail = info.tail_part_num;
603 auto new_head = info.head_part_num;
604 auto new_max = info.max_push_part_num;
605 l.unlock();
606
607 int r = 0;
39ae355f 608 for (auto& entry : tmpjournal) {
b3b6e05e 609 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
610 << " processing entry: entry=" << entry << " tid=" << tid
611 << dendl;
612 switch (entry.op) {
1e59de90
TL
613 using enum fifo::journal_entry::Op;
614 case create:
39ae355f 615 r = create_part(dpp, entry.part_num, tid, y);
f67539c2
TL
616 if (entry.part_num > new_max) {
617 new_max = entry.part_num;
618 }
619 break;
1e59de90 620 case set_head:
f67539c2
TL
621 r = 0;
622 if (entry.part_num > new_head) {
623 new_head = entry.part_num;
624 }
625 break;
1e59de90 626 case remove:
39ae355f 627 r = remove_part(dpp, entry.part_num, tid, y);
f67539c2
TL
628 if (r == -ENOENT) r = 0;
629 if (entry.part_num >= new_tail) {
630 new_tail = entry.part_num + 1;
631 }
632 break;
633 default:
b3b6e05e 634 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
635 << " unknown journaled op: entry=" << entry << " tid="
636 << tid << dendl;
637 return -EIO;
638 }
639
640 if (r < 0) {
b3b6e05e 641 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
642 << " processing entry failed: entry=" << entry
643 << " r=" << r << " tid=" << tid << dendl;
644 return -r;
645 }
646
647 processed.push_back(std::move(entry));
648 }
649
650 // Postprocess
651 bool canceled = true;
652
653 for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
b3b6e05e 654 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
655 << " postprocessing: i=" << i << " tid=" << tid << dendl;
656
657 std::optional<int64_t> tail_part_num;
658 std::optional<int64_t> head_part_num;
659 std::optional<int64_t> max_part_num;
660
661 std::unique_lock l(m);
662 auto objv = info.version;
663 if (new_tail > tail_part_num) tail_part_num = new_tail;
664 if (new_head > info.head_part_num) head_part_num = new_head;
665 if (new_max > info.max_push_part_num) max_part_num = new_max;
666 l.unlock();
667
668 if (processed.empty() &&
669 !tail_part_num &&
670 !max_part_num) {
b3b6e05e 671 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
672 << " nothing to update any more: i=" << i << " tid="
673 << tid << dendl;
674 canceled = false;
675 break;
676 }
677 auto u = fifo::update().tail_part_num(tail_part_num)
678 .head_part_num(head_part_num).max_push_part_num(max_part_num)
679 .journal_entries_rm(processed);
b3b6e05e 680 r = _update_meta(dpp, u, objv, &canceled, tid, y);
f67539c2 681 if (r < 0) {
b3b6e05e 682 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
683 << " _update_meta failed: update=" << u
684 << " r=" << r << " tid=" << tid << dendl;
685 break;
686 }
687
688 if (canceled) {
689 std::vector<fifo::journal_entry> new_processed;
690 std::unique_lock l(m);
b3b6e05e 691 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
692 << " update canceled, retrying: i=" << i << " tid="
693 << tid << dendl;
694 for (auto& e : processed) {
39ae355f
TL
695 if (info.journal.contains(e)) {
696 new_processed.push_back(e);
f67539c2 697 }
f67539c2
TL
698 }
699 processed = std::move(new_processed);
700 }
701 }
702 if (r == 0 && canceled) {
b3b6e05e 703 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
704 << " canceled too many times, giving up: tid=" << tid << dendl;
705 r = -ECANCELED;
706 }
707 if (r < 0) {
b3b6e05e 708 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
709 << " failed, r=: " << r << " tid=" << tid << dendl;
710 }
711 return r;
712}
713
39ae355f
TL
714int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp,
715 std::int64_t new_part_num, bool is_head,
716 std::uint64_t tid, optional_yield y)
f67539c2 717{
b3b6e05e 718 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
719 << " entering: tid=" << tid << dendl;
720 std::unique_lock l(m);
1e59de90
TL
721 using enum fifo::journal_entry::Op;
722 std::vector<fifo::journal_entry> jentries{{ create, new_part_num }};
723 if (info.journal.contains({create, new_part_num}) &&
724 (!is_head || info.journal.contains({set_head, new_part_num}))) {
f67539c2 725 l.unlock();
b3b6e05e 726 ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
727 << " new part journaled, but not processed: tid="
728 << tid << dendl;
b3b6e05e 729 auto r = process_journal(dpp, tid, y);
f67539c2 730 if (r < 0) {
b3b6e05e 731 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
732 << " process_journal failed: r=" << r << " tid=" << tid << dendl;
733 }
734 return r;
735 }
f67539c2
TL
736 auto version = info.version;
737
738 if (is_head) {
b3b6e05e 739 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 740 << " needs new head: tid=" << tid << dendl;
1e59de90 741 jentries.push_back({ set_head, new_part_num });
f67539c2
TL
742 }
743 l.unlock();
744
745 int r = 0;
746 bool canceled = true;
747 for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
748 canceled = false;
b3b6e05e 749 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
750 << " updating metadata: i=" << i << " tid=" << tid << dendl;
751 auto u = fifo::update{}.journal_entries_add(jentries);
b3b6e05e 752 r = _update_meta(dpp, u, version, &canceled, tid, y);
f67539c2
TL
753 if (r >= 0 && canceled) {
754 std::unique_lock l(m);
39ae355f 755 version = info.version;
1e59de90
TL
756 auto found = (info.journal.contains({create, new_part_num}) ||
757 info.journal.contains({set_head, new_part_num}));
39ae355f
TL
758 if ((info.max_push_part_num >= new_part_num &&
759 info.head_part_num >= new_part_num)) {
b3b6e05e 760 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
761 << " raced, but journaled and processed: i=" << i
762 << " tid=" << tid << dendl;
763 return 0;
764 }
765 if (found) {
b3b6e05e 766 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
767 << " raced, journaled but not processed: i=" << i
768 << " tid=" << tid << dendl;
769 canceled = false;
770 }
771 l.unlock();
772 }
773 if (r < 0) {
b3b6e05e 774 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
775 << " _update_meta failed: update=" << u << " r=" << r
776 << " tid=" << tid << dendl;
777 return r;
778 }
779 }
780 if (canceled) {
b3b6e05e 781 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
782 << " canceled too many times, giving up: tid=" << tid << dendl;
783 return -ECANCELED;
784 }
b3b6e05e 785 r = process_journal(dpp, tid, y);
f67539c2 786 if (r < 0) {
b3b6e05e 787 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
788 << " process_journal failed: r=" << r << " tid=" << tid << dendl;
789 }
790 return r;
791}
792
39ae355f
TL
793int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp,
794 std::int64_t new_head_part_num,
795 std::uint64_t tid, optional_yield y)
f67539c2 796{
b3b6e05e 797 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
39ae355f 798 << " entering: tid=" << tid << dendl;
f67539c2 799 std::unique_lock l(m);
f67539c2
TL
800 auto max_push_part_num = info.max_push_part_num;
801 auto version = info.version;
802 l.unlock();
803
804 int r = 0;
39ae355f 805 if (max_push_part_num < new_head_part_num) {
b3b6e05e 806 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 807 << " need new part: tid=" << tid << dendl;
39ae355f 808 r = _prepare_new_part(dpp, new_head_part_num, true, tid, y);
f67539c2 809 if (r < 0) {
b3b6e05e 810 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
811 << " _prepare_new_part failed: r=" << r
812 << " tid=" << tid << dendl;
813 return r;
814 }
815 std::unique_lock l(m);
39ae355f 816 if (info.max_push_part_num < new_head_part_num) {
b3b6e05e 817 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
818 << " inconsistency, push part less than head part: "
819 << " tid=" << tid << dendl;
820 return -EIO;
821 }
822 l.unlock();
823 return 0;
824 }
825
1e59de90 826 using enum fifo::journal_entry::Op;
39ae355f 827 fifo::journal_entry jentry;
1e59de90 828 jentry.op = set_head;
39ae355f
TL
829 jentry.part_num = new_head_part_num;
830
831 r = 0;
f67539c2
TL
832 bool canceled = true;
833 for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
39ae355f 834 canceled = false;
b3b6e05e 835 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
39ae355f
TL
836 << " updating metadata: i=" << i << " tid=" << tid << dendl;
837 auto u = fifo::update{}.journal_entries_add({{ jentry }});
b3b6e05e 838 r = _update_meta(dpp, u, version, &canceled, tid, y);
39ae355f
TL
839 if (r >= 0 && canceled) {
840 std::unique_lock l(m);
1e59de90
TL
841 auto found = (info.journal.contains({create, new_head_part_num}) ||
842 info.journal.contains({set_head, new_head_part_num}));
39ae355f
TL
843 version = info.version;
844 if ((info.head_part_num >= new_head_part_num)) {
845 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
846 << " raced, but journaled and processed: i=" << i
847 << " tid=" << tid << dendl;
848 return 0;
849 }
850 if (found) {
851 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
852 << " raced, journaled but not processed: i=" << i
853 << " tid=" << tid << dendl;
854 canceled = false;
855 }
856 l.unlock();
857 }
f67539c2 858 if (r < 0) {
b3b6e05e 859 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
860 << " _update_meta failed: update=" << u << " r=" << r
861 << " tid=" << tid << dendl;
862 return r;
863 }
f67539c2
TL
864 }
865 if (canceled) {
b3b6e05e 866 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
867 << " canceled too many times, giving up: tid=" << tid << dendl;
868 return -ECANCELED;
869 }
39ae355f
TL
870 r = process_journal(dpp, tid, y);
871 if (r < 0) {
872 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
873 << " process_journal failed: r=" << r << " tid=" << tid << dendl;
874 }
875 return r;
f67539c2
TL
876}
877
878struct NewPartPreparer : public Completion<NewPartPreparer> {
879 FIFO* f;
880 std::vector<fifo::journal_entry> jentries;
881 int i = 0;
39ae355f 882 std::int64_t new_part_num;
f67539c2
TL
883 bool canceled = false;
884 uint64_t tid;
885
b3b6e05e 886 NewPartPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
f67539c2 887 std::vector<fifo::journal_entry> jentries,
39ae355f 888 std::int64_t new_part_num,
f67539c2 889 std::uint64_t tid)
b3b6e05e 890 : Completion(dpp, super), f(f), jentries(std::move(jentries)),
39ae355f 891 new_part_num(new_part_num), tid(tid) {}
f67539c2 892
b3b6e05e
TL
893 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
894 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
895 << " entering: tid=" << tid << dendl;
896 if (r < 0) {
b3b6e05e 897 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
898 << " _update_meta failed: r=" << r
899 << " tid=" << tid << dendl;
900 complete(std::move(p), r);
901 return;
902 }
903
904 if (canceled) {
1e59de90 905 using enum fifo::journal_entry::Op;
f67539c2 906 std::unique_lock l(f->m);
1e59de90
TL
907 auto found = (f->info.journal.contains({create, new_part_num}) ||
908 f->info.journal.contains({set_head, new_part_num}));
f67539c2
TL
909 auto max_push_part_num = f->info.max_push_part_num;
910 auto head_part_num = f->info.head_part_num;
911 auto version = f->info.version;
f67539c2 912 l.unlock();
39ae355f
TL
913 if ((max_push_part_num >= new_part_num &&
914 head_part_num >= new_part_num)) {
b3b6e05e 915 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
916 << " raced, but journaled and processed: i=" << i
917 << " tid=" << tid << dendl;
918 complete(std::move(p), 0);
919 return;
920 }
921 if (i >= MAX_RACE_RETRIES) {
922 complete(std::move(p), -ECANCELED);
923 return;
924 }
925 if (!found) {
926 ++i;
b3b6e05e 927 f->_update_meta(dpp, fifo::update{}
f67539c2
TL
928 .journal_entries_add(jentries),
929 version, &canceled, tid, call(std::move(p)));
930 return;
931 } else {
b3b6e05e 932 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
933 << " raced, journaled but not processed: i=" << i
934 << " tid=" << tid << dendl;
935 canceled = false;
936 }
937 // Fall through. We still need to process the journal.
938 }
b3b6e05e 939 f->process_journal(dpp, tid, super());
f67539c2
TL
940 return;
941 }
942};
943
39ae355f
TL
944void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, std::int64_t new_part_num,
945 bool is_head, std::uint64_t tid, lr::AioCompletion* c)
f67539c2
TL
946{
947 std::unique_lock l(m);
1e59de90
TL
948 using enum fifo::journal_entry::Op;
949 std::vector<fifo::journal_entry> jentries{{create, new_part_num}};
950 if (info.journal.contains({create, new_part_num}) &&
951 (!is_head || info.journal.contains({set_head, new_part_num}))) {
f67539c2 952 l.unlock();
b3b6e05e 953 ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
954 << " new part journaled, but not processed: tid="
955 << tid << dendl;
b3b6e05e 956 process_journal(dpp, tid, c);
f67539c2
TL
957 return;
958 }
f67539c2
TL
959 auto version = info.version;
960
961 if (is_head) {
1e59de90 962 jentries.push_back({ set_head, new_part_num });
f67539c2
TL
963 }
964 l.unlock();
965
b3b6e05e 966 auto n = std::make_unique<NewPartPreparer>(dpp, this, c, jentries,
39ae355f 967 new_part_num, tid);
f67539c2 968 auto np = n.get();
b3b6e05e 969 _update_meta(dpp, fifo::update{}.journal_entries_add(jentries), version,
f67539c2
TL
970 &np->canceled, tid, NewPartPreparer::call(std::move(n)));
971}
972
973struct NewHeadPreparer : public Completion<NewHeadPreparer> {
974 FIFO* f;
975 int i = 0;
976 bool newpart;
39ae355f 977 std::int64_t new_head_part_num;
f67539c2
TL
978 bool canceled = false;
979 std::uint64_t tid;
980
b3b6e05e 981 NewHeadPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
39ae355f
TL
982 bool newpart, std::int64_t new_head_part_num,
983 std::uint64_t tid)
984 : Completion(dpp, super), f(f), newpart(newpart),
985 new_head_part_num(new_head_part_num), tid(tid) {}
f67539c2 986
b3b6e05e 987 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
988 if (newpart)
989 handle_newpart(std::move(p), r);
990 else
b3b6e05e 991 handle_update(dpp, std::move(p), r);
f67539c2
TL
992 }
993
994 void handle_newpart(Ptr&& p, int r) {
995 if (r < 0) {
996 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
997 << " _prepare_new_part failed: r=" << r
998 << " tid=" << tid << dendl;
999 complete(std::move(p), r);
1000 return;
1001 }
1002 std::unique_lock l(f->m);
39ae355f 1003 if (f->info.max_push_part_num < new_head_part_num) {
f67539c2
TL
1004 l.unlock();
1005 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1006 << " _prepare_new_part failed: r=" << r
1007 << " tid=" << tid << dendl;
1008 complete(std::move(p), -EIO);
1009 } else {
1010 l.unlock();
1011 complete(std::move(p), 0);
1012 }
1013 }
1014
b3b6e05e 1015 void handle_update(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
39ae355f
TL
1016 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1017 << " entering: tid=" << tid << dendl;
f67539c2 1018 if (r < 0) {
b3b6e05e 1019 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
39ae355f 1020 << " _update_meta failed: r=" << r
f67539c2
TL
1021 << " tid=" << tid << dendl;
1022 complete(std::move(p), r);
1023 return;
1024 }
39ae355f 1025
f67539c2 1026 if (canceled) {
1e59de90 1027 using enum fifo::journal_entry::Op;
39ae355f 1028 std::unique_lock l(f->m);
1e59de90
TL
1029 auto found = (f->info.journal.contains({create, new_head_part_num }) ||
1030 f->info.journal.contains({set_head, new_head_part_num }));
39ae355f
TL
1031 auto head_part_num = f->info.head_part_num;
1032 auto version = f->info.version;
1033
1034 l.unlock();
1035 if ((head_part_num >= new_head_part_num)) {
1036 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1037 << " raced, but journaled and processed: i=" << i
1038 << " tid=" << tid << dendl;
1039 complete(std::move(p), 0);
1040 return;
1041 }
f67539c2 1042 if (i >= MAX_RACE_RETRIES) {
f67539c2
TL
1043 complete(std::move(p), -ECANCELED);
1044 return;
1045 }
39ae355f 1046 if (!found) {
f67539c2 1047 ++i;
39ae355f 1048 fifo::journal_entry jentry;
1e59de90 1049 jentry.op = set_head;
39ae355f
TL
1050 jentry.part_num = new_head_part_num;
1051 f->_update_meta(dpp, fifo::update{}
1052 .journal_entries_add({{jentry}}),
1053 version, &canceled, tid, call(std::move(p)));
f67539c2 1054 return;
39ae355f
TL
1055 } else {
1056 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1057 << " raced, journaled but not processed: i=" << i
1058 << " tid=" << tid << dendl;
1059 canceled = false;
f67539c2 1060 }
39ae355f 1061 // Fall through. We still need to process the journal.
f67539c2 1062 }
39ae355f 1063 f->process_journal(dpp, tid, super());
f67539c2
TL
1064 return;
1065 }
1066};
1067
39ae355f
TL
1068void FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::int64_t new_head_part_num,
1069 std::uint64_t tid, lr::AioCompletion* c)
f67539c2 1070{
b3b6e05e 1071 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1072 << " entering: tid=" << tid << dendl;
1073 std::unique_lock l(m);
f67539c2
TL
1074 auto max_push_part_num = info.max_push_part_num;
1075 auto version = info.version;
1076 l.unlock();
1077
39ae355f 1078 if (max_push_part_num < new_head_part_num) {
b3b6e05e 1079 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1080 << " need new part: tid=" << tid << dendl;
39ae355f 1081 auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, true, new_head_part_num,
f67539c2 1082 tid);
39ae355f
TL
1083 _prepare_new_part(dpp, new_head_part_num, true, tid,
1084 NewHeadPreparer::call(std::move(n)));
f67539c2 1085 } else {
b3b6e05e 1086 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1087 << " updating head: tid=" << tid << dendl;
39ae355f 1088 auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, false, new_head_part_num,
f67539c2
TL
1089 tid);
1090 auto np = n.get();
1e59de90 1091 using enum fifo::journal_entry::Op;
39ae355f 1092 fifo::journal_entry jentry;
1e59de90 1093 jentry.op = set_head;
39ae355f
TL
1094 jentry.part_num = new_head_part_num;
1095 _update_meta(dpp, fifo::update{}.journal_entries_add({{jentry}}), version,
f67539c2
TL
1096 &np->canceled, tid, NewHeadPreparer::call(std::move(n)));
1097 }
1098}
1099
b3b6e05e 1100int FIFO::push_entries(const DoutPrefixProvider *dpp, const std::deque<cb::list>& data_bufs,
f67539c2
TL
1101 std::uint64_t tid, optional_yield y)
1102{
b3b6e05e 1103 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1104 << " entering: tid=" << tid << dendl;
1105 std::unique_lock l(m);
1106 auto head_part_num = info.head_part_num;
f67539c2
TL
1107 const auto part_oid = info.part_oid(head_part_num);
1108 l.unlock();
1109
39ae355f 1110 auto r = push_part(dpp, ioctx, part_oid, data_bufs, tid, y);
f67539c2 1111 if (r < 0) {
b3b6e05e 1112 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1113 << " push_part failed: r=" << r << " tid=" << tid << dendl;
1114 }
1115 return r;
1116}
1117
1118void FIFO::push_entries(const std::deque<cb::list>& data_bufs,
1119 std::uint64_t tid, lr::AioCompletion* c)
1120{
1121 std::unique_lock l(m);
1122 auto head_part_num = info.head_part_num;
f67539c2
TL
1123 const auto part_oid = info.part_oid(head_part_num);
1124 l.unlock();
1125
39ae355f 1126 push_part(ioctx, part_oid, data_bufs, tid, c);
f67539c2
TL
1127}
1128
b3b6e05e 1129int FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
f67539c2
TL
1130 bool exclusive, std::uint64_t tid,
1131 optional_yield y)
1132{
b3b6e05e 1133 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1134 << " entering: tid=" << tid << dendl;
1135 lr::ObjectWriteOperation op;
1136 std::unique_lock l(m);
1137 const auto part_oid = info.part_oid(part_num);
1138 l.unlock();
39ae355f 1139 rgw::cls::fifo::trim_part(&op, ofs, exclusive);
b3b6e05e 1140 auto r = rgw_rados_operate(dpp, ioctx, part_oid, &op, y);
f67539c2 1141 if (r < 0) {
b3b6e05e 1142 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1143 << " trim_part failed: r=" << r << " tid=" << tid << dendl;
1144 }
1145 return 0;
1146}
1147
20effc67 1148void FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
f67539c2
TL
1149 bool exclusive, std::uint64_t tid,
1150 lr::AioCompletion* c)
1151{
20effc67 1152 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1153 << " entering: tid=" << tid << dendl;
1154 lr::ObjectWriteOperation op;
1155 std::unique_lock l(m);
1156 const auto part_oid = info.part_oid(part_num);
1157 l.unlock();
39ae355f 1158 rgw::cls::fifo::trim_part(&op, ofs, exclusive);
f67539c2
TL
1159 auto r = ioctx.aio_operate(part_oid, c, &op);
1160 ceph_assert(r >= 0);
1161}
1162
b3b6e05e 1163int FIFO::open(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, std::string oid, std::unique_ptr<FIFO>* fifo,
f67539c2
TL
1164 optional_yield y, std::optional<fifo::objv> objv,
1165 bool probe)
1166{
b3b6e05e 1167 ldpp_dout(dpp, 20)
f67539c2
TL
1168 << __PRETTY_FUNCTION__ << ":" << __LINE__
1169 << " entering" << dendl;
1170 fifo::info info;
1171 std::uint32_t size;
1172 std::uint32_t over;
b3b6e05e 1173 int r = get_meta(dpp, ioctx, std::move(oid), objv, &info, &size, &over, 0, y,
f67539c2
TL
1174 probe);
1175 if (r < 0) {
1176 if (!(probe && (r == -ENOENT || r == -ENODATA))) {
b3b6e05e 1177 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1178 << " get_meta failed: r=" << r << dendl;
1179 }
1180 return r;
1181 }
1182 std::unique_ptr<FIFO> f(new FIFO(std::move(ioctx), oid));
1183 f->info = info;
1184 f->part_header_size = size;
1185 f->part_entry_overhead = over;
1186 // If there are journal entries, process them, in case
1187 // someone crashed mid-transaction.
1188 if (!info.journal.empty()) {
b3b6e05e 1189 ldpp_dout(dpp, 20)
f67539c2
TL
1190 << __PRETTY_FUNCTION__ << ":" << __LINE__
1191 << " processing leftover journal" << dendl;
b3b6e05e 1192 r = f->process_journal(dpp, 0, y);
f67539c2 1193 if (r < 0) {
b3b6e05e 1194 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1195 << " process_journal failed: r=" << r << dendl;
1196 return r;
1197 }
1198 }
1199 *fifo = std::move(f);
1200 return 0;
1201}
1202
b3b6e05e 1203int FIFO::create(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, std::string oid, std::unique_ptr<FIFO>* fifo,
f67539c2
TL
1204 optional_yield y, std::optional<fifo::objv> objv,
1205 std::optional<std::string_view> oid_prefix,
1206 bool exclusive, std::uint64_t max_part_size,
1207 std::uint64_t max_entry_size)
1208{
b3b6e05e 1209 ldpp_dout(dpp, 20)
f67539c2
TL
1210 << __PRETTY_FUNCTION__ << ":" << __LINE__
1211 << " entering" << dendl;
1212 lr::ObjectWriteOperation op;
1213 create_meta(&op, oid, objv, oid_prefix, exclusive, max_part_size,
1214 max_entry_size);
b3b6e05e 1215 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
f67539c2 1216 if (r < 0) {
b3b6e05e 1217 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1218 << " create_meta failed: r=" << r << dendl;
1219 return r;
1220 }
b3b6e05e 1221 r = open(dpp, std::move(ioctx), std::move(oid), fifo, y, objv);
f67539c2
TL
1222 return r;
1223}
1224
b3b6e05e
TL
1225int FIFO::read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y) {
1226 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1227 << " entering: tid=" << tid << dendl;
1228 fifo::info _info;
1229 std::uint32_t _phs;
1230 std::uint32_t _peo;
1231
20effc67 1232 auto r = get_meta(dpp, ioctx, oid, std::nullopt, &_info, &_phs, &_peo, tid, y);
f67539c2 1233 if (r < 0) {
b3b6e05e 1234 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1235 << " get_meta failed: r=" << r << " tid=" << tid << dendl;
1236 return r;
1237 }
1238 std::unique_lock l(m);
1239 // We have a newer version already!
1240 if (_info.version.same_or_later(this->info.version)) {
1241 info = std::move(_info);
1242 part_header_size = _phs;
1243 part_entry_overhead = _peo;
1244 }
1245 return 0;
1246}
1247
b3b6e05e 1248int FIFO::read_meta(const DoutPrefixProvider *dpp, optional_yield y) {
f67539c2
TL
1249 std::unique_lock l(m);
1250 auto tid = ++next_tid;
1251 l.unlock();
b3b6e05e 1252 return read_meta(dpp, tid, y);
f67539c2
TL
1253}
1254
1255struct Reader : public Completion<Reader> {
1256 FIFO* fifo;
1257 cb::list bl;
1258 std::uint64_t tid;
b3b6e05e
TL
1259 Reader(const DoutPrefixProvider *dpp, FIFO* fifo, lr::AioCompletion* super, std::uint64_t tid)
1260 : Completion(dpp, super), fifo(fifo), tid(tid) {}
f67539c2 1261
b3b6e05e
TL
1262 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
1263 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1264 << " entering: tid=" << tid << dendl;
1265 if (r >= 0) try {
1266 fifo::op::get_meta_reply reply;
1267 auto iter = bl.cbegin();
1268 decode(reply, iter);
1269 std::unique_lock l(fifo->m);
1270 if (reply.info.version.same_or_later(fifo->info.version)) {
1271 fifo->info = std::move(reply.info);
1272 fifo->part_header_size = reply.part_header_size;
1273 fifo->part_entry_overhead = reply.part_entry_overhead;
1274 }
1275 } catch (const cb::error& err) {
b3b6e05e 1276 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1277 << " failed to decode response err=" << err.what()
1278 << " tid=" << tid << dendl;
1279 r = from_error_code(err.code());
1280 } else {
b3b6e05e 1281 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1282 << " read_meta failed r=" << r
1283 << " tid=" << tid << dendl;
1284 }
1285 complete(std::move(p), r);
1286 }
1287};
1288
b3b6e05e 1289void FIFO::read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c)
f67539c2 1290{
b3b6e05e 1291 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1292 << " entering: tid=" << tid << dendl;
1293 lr::ObjectReadOperation op;
1294 fifo::op::get_meta gm;
1295 cb::list in;
1296 encode(gm, in);
b3b6e05e 1297 auto reader = std::make_unique<Reader>(dpp, this, c, tid);
f67539c2
TL
1298 auto rp = reader.get();
1299 auto r = ioctx.aio_exec(oid, Reader::call(std::move(reader)), fifo::op::CLASS,
1300 fifo::op::GET_META, in, &rp->bl);
1301 assert(r >= 0);
1302}
1303
1304const fifo::info& FIFO::meta() const {
1305 return info;
1306}
1307
1308std::pair<std::uint32_t, std::uint32_t> FIFO::get_part_layout_info() const {
1309 return {part_header_size, part_entry_overhead};
1310}
1311
b3b6e05e
TL
1312int FIFO::push(const DoutPrefixProvider *dpp, const cb::list& bl, optional_yield y) {
1313 return push(dpp, std::vector{ bl }, y);
f67539c2
TL
1314}
1315
b3b6e05e
TL
1316void FIFO::push(const DoutPrefixProvider *dpp, const cb::list& bl, lr::AioCompletion* c) {
1317 push(dpp, std::vector{ bl }, c);
f67539c2
TL
1318}
1319
b3b6e05e 1320int FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_bufs, optional_yield y)
f67539c2
TL
1321{
1322 std::unique_lock l(m);
1323 auto tid = ++next_tid;
1324 auto max_entry_size = info.params.max_entry_size;
1325 auto need_new_head = info.need_new_head();
39ae355f 1326 auto head_part_num = info.head_part_num;
f67539c2 1327 l.unlock();
b3b6e05e 1328 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1329 << " entering: tid=" << tid << dendl;
1330 if (data_bufs.empty()) {
b3b6e05e 1331 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1332 << " empty push, returning success tid=" << tid << dendl;
1333 return 0;
1334 }
1335
1336 // Validate sizes
1337 for (const auto& bl : data_bufs) {
1338 if (bl.length() > max_entry_size) {
b3b6e05e 1339 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1340 << " entry bigger than max_entry_size tid=" << tid << dendl;
1341 return -E2BIG;
1342 }
1343 }
1344
1345 int r = 0;
1346 if (need_new_head) {
b3b6e05e 1347 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1348 << " need new head tid=" << tid << dendl;
39ae355f 1349 r = _prepare_new_head(dpp, head_part_num + 1, tid, y);
f67539c2 1350 if (r < 0) {
b3b6e05e 1351 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1352 << " _prepare_new_head failed: r=" << r
1353 << " tid=" << tid << dendl;
1354 return r;
1355 }
1356 }
1357
1358 std::deque<cb::list> remaining(data_bufs.begin(), data_bufs.end());
1359 std::deque<cb::list> batch;
1360
1361 uint64_t batch_len = 0;
1362 auto retries = 0;
1363 bool canceled = true;
1364 while ((!remaining.empty() || !batch.empty()) &&
1365 (retries <= MAX_RACE_RETRIES)) {
b3b6e05e 1366 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1367 << " preparing push: remaining=" << remaining.size()
1368 << " batch=" << batch.size() << " retries=" << retries
1369 << " tid=" << tid << dendl;
1370 std::unique_lock l(m);
39ae355f 1371 head_part_num = info.head_part_num;
f67539c2
TL
1372 auto max_part_size = info.params.max_part_size;
1373 auto overhead = part_entry_overhead;
1374 l.unlock();
1375
1376 while (!remaining.empty() &&
1377 (remaining.front().length() + batch_len <= max_part_size)) {
1378 /* We can send entries with data_len up to max_entry_size,
1379 however, we want to also account the overhead when
1380 dealing with multiple entries. Previous check doesn't
1381 account for overhead on purpose. */
1382 batch_len += remaining.front().length() + overhead;
1383 batch.push_back(std::move(remaining.front()));
1384 remaining.pop_front();
1385 }
b3b6e05e 1386 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1387 << " prepared push: remaining=" << remaining.size()
1388 << " batch=" << batch.size() << " retries=" << retries
1389 << " batch_len=" << batch_len
1390 << " tid=" << tid << dendl;
1391
b3b6e05e 1392 auto r = push_entries(dpp, batch, tid, y);
f67539c2
TL
1393 if (r == -ERANGE) {
1394 canceled = true;
1395 ++retries;
b3b6e05e 1396 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
39ae355f
TL
1397 << " need new head tid=" << tid << dendl;
1398 r = _prepare_new_head(dpp, head_part_num + 1, tid, y);
f67539c2 1399 if (r < 0) {
b3b6e05e 1400 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1401 << " prepare_new_head failed: r=" << r
1402 << " tid=" << tid << dendl;
1403 return r;
1404 }
1405 r = 0;
1406 continue;
1407 }
39ae355f
TL
1408 if (r == -ENOENT) {
1409 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1410 << " racing client trimmed part, rereading metadata "
1411 << "tid=" << tid << dendl;
1412 canceled = true;
1413 ++retries;
1414 r = read_meta(dpp, y);
1415 if (r < 0) {
1416 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1417 << " read_meta failed: r=" << r
1418 << " tid=" << tid << dendl;
1419 return r;
1420 }
1421 r = 0;
1422 continue;
1423 }
f67539c2 1424 if (r < 0) {
b3b6e05e 1425 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1426 << " push_entries failed: r=" << r
1427 << " tid=" << tid << dendl;
1428 return r;
1429 }
1430 // Made forward progress!
1431 canceled = false;
1432 retries = 0;
1433 batch_len = 0;
1e59de90 1434 if (r == ssize(batch)) {
f67539c2
TL
1435 batch.clear();
1436 } else {
1437 batch.erase(batch.begin(), batch.begin() + r);
1438 for (const auto& b : batch) {
1439 batch_len += b.length() + part_entry_overhead;
1440 }
1441 }
1442 }
1443 if (canceled) {
b3b6e05e 1444 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1445 << " canceled too many times, giving up: tid=" << tid << dendl;
1446 return -ECANCELED;
1447 }
1448 return 0;
1449}
1450
1451struct Pusher : public Completion<Pusher> {
1452 FIFO* f;
1453 std::deque<cb::list> remaining;
1454 std::deque<cb::list> batch;
1455 int i = 0;
39ae355f 1456 std::int64_t head_part_num;
f67539c2 1457 std::uint64_t tid;
39ae355f 1458 enum { pushing, new_heading, meta_reading } state = pushing;
f67539c2 1459
20effc67 1460 void prep_then_push(const DoutPrefixProvider *dpp, Ptr&& p, const unsigned successes) {
f67539c2
TL
1461 std::unique_lock l(f->m);
1462 auto max_part_size = f->info.params.max_part_size;
1463 auto part_entry_overhead = f->part_entry_overhead;
39ae355f 1464 head_part_num = f->info.head_part_num;
f67539c2
TL
1465 l.unlock();
1466
20effc67 1467 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1468 << " preparing push: remaining=" << remaining.size()
1469 << " batch=" << batch.size() << " i=" << i
1470 << " tid=" << tid << dendl;
1471
1472 uint64_t batch_len = 0;
1473 if (successes > 0) {
1474 if (successes == batch.size()) {
1475 batch.clear();
1476 } else {
1477 batch.erase(batch.begin(), batch.begin() + successes);
1478 for (const auto& b : batch) {
1479 batch_len += b.length() + part_entry_overhead;
1480 }
1481 }
1482 }
1483
1484 if (batch.empty() && remaining.empty()) {
1485 complete(std::move(p), 0);
1486 return;
1487 }
1488
1489 while (!remaining.empty() &&
1490 (remaining.front().length() + batch_len <= max_part_size)) {
1491
1492 /* We can send entries with data_len up to max_entry_size,
1493 however, we want to also account the overhead when
1494 dealing with multiple entries. Previous check doesn't
1495 account for overhead on purpose. */
1496 batch_len += remaining.front().length() + part_entry_overhead;
1497 batch.push_back(std::move(remaining.front()));
1498 remaining.pop_front();
1499 }
20effc67 1500 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1501 << " prepared push: remaining=" << remaining.size()
1502 << " batch=" << batch.size() << " i=" << i
1503 << " batch_len=" << batch_len
1504 << " tid=" << tid << dendl;
1505 push(std::move(p));
1506 }
1507
1508 void push(Ptr&& p) {
1509 f->push_entries(batch, tid, call(std::move(p)));
1510 }
1511
b3b6e05e 1512 void new_head(const DoutPrefixProvider *dpp, Ptr&& p) {
39ae355f
TL
1513 state = new_heading;
1514 f->_prepare_new_head(dpp, head_part_num + 1, tid, call(std::move(p)));
1515 }
1516
1517 void read_meta(const DoutPrefixProvider *dpp, Ptr&& p) {
1518 ++i;
1519 state = meta_reading;
1520 f->read_meta(dpp, tid, call(std::move(p)));
f67539c2
TL
1521 }
1522
b3b6e05e 1523 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
39ae355f
TL
1524 switch (state) {
1525 case pushing:
f67539c2 1526 if (r == -ERANGE) {
b3b6e05e 1527 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1528 << " need new head tid=" << tid << dendl;
b3b6e05e 1529 new_head(dpp, std::move(p));
f67539c2
TL
1530 return;
1531 }
39ae355f
TL
1532 if (r == -ENOENT) {
1533 if (i > MAX_RACE_RETRIES) {
1534 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1535 << " racing client deleted part, but we're out"
1536 << " of retries: tid=" << tid << dendl;
1537 complete(std::move(p), r);
1538 }
1539 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1540 << " racing client deleted part: tid=" << tid << dendl;
1541 read_meta(dpp, std::move(p));
1542 return;
1543 }
f67539c2 1544 if (r < 0) {
b3b6e05e 1545 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1546 << " push_entries failed: r=" << r
1547 << " tid=" << tid << dendl;
1548 complete(std::move(p), r);
1549 return;
1550 }
1551 i = 0; // We've made forward progress, so reset the race counter!
20effc67 1552 prep_then_push(dpp, std::move(p), r);
39ae355f
TL
1553 break;
1554
1555 case new_heading:
f67539c2 1556 if (r < 0) {
b3b6e05e 1557 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1558 << " prepare_new_head failed: r=" << r
1559 << " tid=" << tid << dendl;
1560 complete(std::move(p), r);
1561 return;
1562 }
39ae355f 1563 state = pushing;
20effc67 1564 handle_new_head(dpp, std::move(p), r);
39ae355f
TL
1565 break;
1566
1567 case meta_reading:
1568 if (r < 0) {
1569 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1570 << " read_meta failed: r=" << r
1571 << " tid=" << tid << dendl;
1572 complete(std::move(p), r);
1573 return;
1574 }
1575 state = pushing;
1576 prep_then_push(dpp, std::move(p), r);
1577 break;
f67539c2
TL
1578 }
1579 }
1580
20effc67 1581 void handle_new_head(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
1582 if (r == -ECANCELED) {
1583 if (p->i == MAX_RACE_RETRIES) {
20effc67 1584 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1585 << " canceled too many times, giving up: tid=" << tid << dendl;
1586 complete(std::move(p), -ECANCELED);
1587 return;
1588 }
1589 ++p->i;
1590 } else if (r) {
1591 complete(std::move(p), r);
1592 return;
1593 }
1594
1595 if (p->batch.empty()) {
20effc67 1596 prep_then_push(dpp, std::move(p), 0);
f67539c2
TL
1597 return;
1598 } else {
1599 push(std::move(p));
1600 return;
1601 }
1602 }
1603
b3b6e05e 1604 Pusher(const DoutPrefixProvider *dpp, FIFO* f, std::deque<cb::list>&& remaining,
39ae355f
TL
1605 std::int64_t head_part_num, std::uint64_t tid,
1606 lr::AioCompletion* super)
b3b6e05e 1607 : Completion(dpp, super), f(f), remaining(std::move(remaining)),
39ae355f 1608 head_part_num(head_part_num), tid(tid) {}
f67539c2
TL
1609};
1610
b3b6e05e 1611void FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_bufs,
f67539c2
TL
1612 lr::AioCompletion* c)
1613{
1614 std::unique_lock l(m);
1615 auto tid = ++next_tid;
1616 auto max_entry_size = info.params.max_entry_size;
1617 auto need_new_head = info.need_new_head();
39ae355f 1618 auto head_part_num = info.head_part_num;
f67539c2 1619 l.unlock();
b3b6e05e 1620 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1621 << " entering: tid=" << tid << dendl;
b3b6e05e 1622 auto p = std::make_unique<Pusher>(dpp, this, std::deque<cb::list>(data_bufs.begin(), data_bufs.end()),
39ae355f 1623 head_part_num, tid, c);
f67539c2
TL
1624 // Validate sizes
1625 for (const auto& bl : data_bufs) {
1626 if (bl.length() > max_entry_size) {
b3b6e05e 1627 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1628 << " entry bigger than max_entry_size tid=" << tid << dendl;
1629 Pusher::complete(std::move(p), -E2BIG);
1630 return;
1631 }
1632 }
1633
1634 if (data_bufs.empty() ) {
b3b6e05e 1635 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1636 << " empty push, returning success tid=" << tid << dendl;
1637 Pusher::complete(std::move(p), 0);
1638 return;
1639 }
1640
1641 if (need_new_head) {
b3b6e05e 1642 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1643 << " need new head tid=" << tid << dendl;
b3b6e05e 1644 p->new_head(dpp, std::move(p));
f67539c2 1645 } else {
20effc67 1646 p->prep_then_push(dpp, std::move(p), 0);
f67539c2
TL
1647 }
1648}
1649
b3b6e05e 1650int FIFO::list(const DoutPrefixProvider *dpp, int max_entries,
f67539c2
TL
1651 std::optional<std::string_view> markstr,
1652 std::vector<list_entry>* presult, bool* pmore,
1653 optional_yield y)
1654{
1655 std::unique_lock l(m);
1656 auto tid = ++next_tid;
1657 std::int64_t part_num = info.tail_part_num;
1658 l.unlock();
b3b6e05e 1659 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1660 << " entering: tid=" << tid << dendl;
1661 std::uint64_t ofs = 0;
1662 if (markstr) {
1663 auto marker = to_marker(*markstr);
1664 if (!marker) {
b3b6e05e 1665 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1666 << " invalid marker string: " << markstr
1667 << " tid= "<< tid << dendl;
1668 return -EINVAL;
1669 }
1670 part_num = marker->num;
1671 ofs = marker->ofs;
1672 }
1673
1674 std::vector<list_entry> result;
1675 result.reserve(max_entries);
1676 bool more = false;
1677
1678 std::vector<fifo::part_list_entry> entries;
1679 int r = 0;
1680 while (max_entries > 0) {
b3b6e05e 1681 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1682 << " max_entries=" << max_entries << " tid=" << tid << dendl;
1683 bool part_more = false;
1684 bool part_full = false;
1685
1686 std::unique_lock l(m);
1687 auto part_oid = info.part_oid(part_num);
1688 l.unlock();
1689
39ae355f
TL
1690 r = list_part(dpp, ioctx, part_oid, ofs, max_entries, &entries,
1691 &part_more, &part_full, tid, y);
f67539c2 1692 if (r == -ENOENT) {
b3b6e05e 1693 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1694 << " missing part, rereading metadata"
1695 << " tid= "<< tid << dendl;
b3b6e05e 1696 r = read_meta(dpp, tid, y);
f67539c2 1697 if (r < 0) {
b3b6e05e 1698 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1699 << " read_meta failed: r=" << r
1700 << " tid= "<< tid << dendl;
1701 return r;
1702 }
1703 if (part_num < info.tail_part_num) {
1704 /* raced with trim? restart */
b3b6e05e 1705 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1706 << " raced with trim, restarting: tid=" << tid << dendl;
1707 max_entries += result.size();
1708 result.clear();
1709 std::unique_lock l(m);
1710 part_num = info.tail_part_num;
1711 l.unlock();
1712 ofs = 0;
1713 continue;
1714 }
b3b6e05e 1715 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1716 << " assuming part was not written yet, so end of data: "
1717 << "tid=" << tid << dendl;
1718 more = false;
1719 r = 0;
1720 break;
1721 }
1722 if (r < 0) {
b3b6e05e 1723 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1724 << " list_entries failed: r=" << r
1725 << " tid= "<< tid << dendl;
1726 return r;
1727 }
1728 more = part_full || part_more;
1729 for (auto& entry : entries) {
1730 list_entry e;
1731 e.data = std::move(entry.data);
1732 e.marker = marker{part_num, entry.ofs}.to_string();
1733 e.mtime = entry.mtime;
1734 result.push_back(std::move(e));
1735 --max_entries;
1736 if (max_entries == 0)
1737 break;
1738 }
1739 entries.clear();
1740 if (max_entries > 0 &&
1741 part_more) {
1742 }
1743
1744 if (!part_full) {
b3b6e05e 1745 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1746 << " head part is not full, so we can assume we're done: "
1747 << "tid=" << tid << dendl;
1748 break;
1749 }
1750 if (!part_more) {
1751 ++part_num;
1752 ofs = 0;
1753 }
1754 }
1755 if (presult)
1756 *presult = std::move(result);
1757 if (pmore)
1758 *pmore = more;
1759 return 0;
1760}
1761
b3b6e05e 1762int FIFO::trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool exclusive, optional_yield y)
f67539c2
TL
1763{
1764 bool overshoot = false;
1765 auto marker = to_marker(markstr);
1766 if (!marker) {
1767 return -EINVAL;
1768 }
1769 auto part_num = marker->num;
1770 auto ofs = marker->ofs;
1771 std::unique_lock l(m);
1772 auto tid = ++next_tid;
1773 auto hn = info.head_part_num;
1774 const auto max_part_size = info.params.max_part_size;
1775 if (part_num > hn) {
1776 l.unlock();
b3b6e05e 1777 auto r = read_meta(dpp, tid, y);
f67539c2
TL
1778 if (r < 0) {
1779 return r;
1780 }
1781 l.lock();
1782 auto hn = info.head_part_num;
1783 if (part_num > hn) {
1784 overshoot = true;
1785 part_num = hn;
1786 ofs = max_part_size;
1787 }
1788 }
1789 if (part_num < info.tail_part_num) {
1790 return -ENODATA;
1791 }
1792 auto pn = info.tail_part_num;
1793 l.unlock();
b3b6e05e 1794 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1795 << " entering: tid=" << tid << dendl;
1796
1797 int r = 0;
1798 while (pn < part_num) {
b3b6e05e 1799 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1800 << " pn=" << pn << " tid=" << tid << dendl;
1801 std::unique_lock l(m);
1802 l.unlock();
39ae355f 1803 r = trim_part(dpp, pn, max_part_size, false, tid, y);
f67539c2 1804 if (r < 0 && r == -ENOENT) {
b3b6e05e 1805 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1806 << " trim_part failed: r=" << r
1807 << " tid= "<< tid << dendl;
1808 return r;
1809 }
1810 ++pn;
1811 }
39ae355f 1812 r = trim_part(dpp, part_num, ofs, exclusive, tid, y);
f67539c2 1813 if (r < 0 && r != -ENOENT) {
b3b6e05e 1814 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1815 << " trim_part failed: r=" << r
1816 << " tid= "<< tid << dendl;
1817 return r;
1818 }
1819
1820 l.lock();
1821 auto tail_part_num = info.tail_part_num;
1822 auto objv = info.version;
1823 l.unlock();
1824 bool canceled = tail_part_num < part_num;
1825 int retries = 0;
1826 while ((tail_part_num < part_num) &&
1827 canceled &&
1828 (retries <= MAX_RACE_RETRIES)) {
b3b6e05e 1829 r = _update_meta(dpp, fifo::update{}.tail_part_num(part_num), objv, &canceled,
f67539c2
TL
1830 tid, y);
1831 if (r < 0) {
b3b6e05e 1832 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1833 << " _update_meta failed: r=" << r
1834 << " tid= "<< tid << dendl;
1835 return r;
1836 }
1837 if (canceled) {
b3b6e05e 1838 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1839 << " canceled: retries=" << retries
1840 << " tid=" << tid << dendl;
1841 l.lock();
1842 tail_part_num = info.tail_part_num;
1843 objv = info.version;
1844 l.unlock();
1845 ++retries;
1846 }
1847 }
1848 if (canceled) {
b3b6e05e 1849 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1850 << " canceled too many times, giving up: tid=" << tid << dendl;
1851 return -EIO;
1852 }
1853 return overshoot ? -ENODATA : 0;
1854}
1855
1856struct Trimmer : public Completion<Trimmer> {
1857 FIFO* fifo;
1858 std::int64_t part_num;
1859 std::uint64_t ofs;
1860 std::int64_t pn;
1861 bool exclusive;
1862 std::uint64_t tid;
1863 bool update = false;
1864 bool reread = false;
1865 bool canceled = false;
1866 bool overshoot = false;
1867 int retries = 0;
1868
b3b6e05e 1869 Trimmer(const DoutPrefixProvider *dpp, FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn,
f67539c2 1870 bool exclusive, lr::AioCompletion* super, std::uint64_t tid)
b3b6e05e 1871 : Completion(dpp, super), fifo(fifo), part_num(part_num), ofs(ofs), pn(pn),
f67539c2
TL
1872 exclusive(exclusive), tid(tid) {}
1873
b3b6e05e
TL
1874 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
1875 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1876 << " entering: tid=" << tid << dendl;
1877
1878 if (reread) {
1879 reread = false;
1880 if (r < 0) {
b3b6e05e 1881 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1882 << " read_meta failed: r="
1883 << r << " tid=" << tid << dendl;
1884 complete(std::move(p), r);
1885 return;
1886 }
1887 std::unique_lock l(fifo->m);
1888 auto hn = fifo->info.head_part_num;
1889 const auto max_part_size = fifo->info.params.max_part_size;
1890 const auto tail_part_num = fifo->info.tail_part_num;
1891 l.unlock();
1892 if (part_num > hn) {
1893 part_num = hn;
1894 ofs = max_part_size;
1895 overshoot = true;
1896 }
1897 if (part_num < tail_part_num) {
1898 complete(std::move(p), -ENODATA);
1899 return;
1900 }
1901 pn = tail_part_num;
1902 if (pn < part_num) {
b3b6e05e 1903 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1904 << " pn=" << pn << " tid=" << tid << dendl;
39ae355f
TL
1905 fifo->trim_part(dpp, pn++, max_part_size, false, tid,
1906 call(std::move(p)));
f67539c2
TL
1907 } else {
1908 update = true;
1909 canceled = tail_part_num < part_num;
39ae355f 1910 fifo->trim_part(dpp, part_num, ofs, exclusive, tid, call(std::move(p)));
f67539c2
TL
1911 }
1912 return;
1913 }
1914
1915 if (r == -ENOENT) {
1916 r = 0;
1917 }
1918
1919 if (r < 0) {
b3b6e05e 1920 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1921 << (update ? " update_meta " : " trim ") << "failed: r="
1922 << r << " tid=" << tid << dendl;
1923 complete(std::move(p), r);
1924 return;
1925 }
1926
1927 if (!update) {
b3b6e05e 1928 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1929 << " handling preceding trim callback: tid=" << tid << dendl;
1930 retries = 0;
1931 if (pn < part_num) {
b3b6e05e 1932 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1933 << " pn=" << pn << " tid=" << tid << dendl;
1934 std::unique_lock l(fifo->m);
1935 const auto max_part_size = fifo->info.params.max_part_size;
1936 l.unlock();
39ae355f
TL
1937 fifo->trim_part(dpp, pn++, max_part_size, false, tid,
1938 call(std::move(p)));
f67539c2
TL
1939 return;
1940 }
1941
1942 std::unique_lock l(fifo->m);
1943 const auto tail_part_num = fifo->info.tail_part_num;
1944 l.unlock();
1945 update = true;
1946 canceled = tail_part_num < part_num;
39ae355f 1947 fifo->trim_part(dpp, part_num, ofs, exclusive, tid, call(std::move(p)));
f67539c2
TL
1948 return;
1949 }
1950
b3b6e05e 1951 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1952 << " handling update-needed callback: tid=" << tid << dendl;
1953 std::unique_lock l(fifo->m);
1954 auto tail_part_num = fifo->info.tail_part_num;
1955 auto objv = fifo->info.version;
1956 l.unlock();
1957 if ((tail_part_num < part_num) &&
1958 canceled) {
1959 if (retries > MAX_RACE_RETRIES) {
b3b6e05e 1960 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1961 << " canceled too many times, giving up: tid=" << tid << dendl;
1962 complete(std::move(p), -EIO);
1963 return;
1964 }
1965 ++retries;
b3b6e05e 1966 fifo->_update_meta(dpp, fifo::update{}
f67539c2
TL
1967 .tail_part_num(part_num), objv, &canceled,
1968 tid, call(std::move(p)));
1969 } else {
1970 complete(std::move(p), overshoot ? -ENODATA : 0);
1971 }
1972 }
1973};
1974
b3b6e05e 1975void FIFO::trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool exclusive,
f67539c2
TL
1976 lr::AioCompletion* c) {
1977 auto marker = to_marker(markstr);
1978 auto realmark = marker.value_or(::rgw::cls::fifo::marker{});
1979 std::unique_lock l(m);
1980 const auto hn = info.head_part_num;
1981 const auto max_part_size = info.params.max_part_size;
1982 const auto pn = info.tail_part_num;
1983 const auto part_oid = info.part_oid(pn);
1984 auto tid = ++next_tid;
1985 l.unlock();
b3b6e05e 1986 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1987 << " entering: tid=" << tid << dendl;
b3b6e05e 1988 auto trimmer = std::make_unique<Trimmer>(dpp, this, realmark.num, realmark.ofs,
f67539c2
TL
1989 pn, exclusive, c, tid);
1990 if (!marker) {
1991 Trimmer::complete(std::move(trimmer), -EINVAL);
1992 return;
1993 }
1994 ++trimmer->pn;
1995 auto ofs = marker->ofs;
1996 if (marker->num > hn) {
1997 trimmer->reread = true;
b3b6e05e 1998 read_meta(dpp, tid, Trimmer::call(std::move(trimmer)));
f67539c2
TL
1999 return;
2000 }
2001 if (pn < marker->num) {
b3b6e05e 2002 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2003 << " pn=" << pn << " tid=" << tid << dendl;
2004 ofs = max_part_size;
2005 } else {
2006 trimmer->update = true;
2007 }
39ae355f 2008 trim_part(dpp, pn, ofs, exclusive, tid, Trimmer::call(std::move(trimmer)));
f67539c2
TL
2009}
2010
b3b6e05e 2011int FIFO::get_part_info(const DoutPrefixProvider *dpp, int64_t part_num,
f67539c2
TL
2012 fifo::part_header* header,
2013 optional_yield y)
2014{
2015 std::unique_lock l(m);
2016 const auto part_oid = info.part_oid(part_num);
2017 auto tid = ++next_tid;
2018 l.unlock();
b3b6e05e 2019 auto r = rgw::cls::fifo::get_part_info(dpp, ioctx, part_oid, header, tid, y);
f67539c2 2020 if (r < 0) {
b3b6e05e 2021 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2022 << " get_part_info failed: r="
2023 << r << " tid=" << tid << dendl;
2024 }
2025 return r;
2026}
2027
2028void FIFO::get_part_info(int64_t part_num,
2029 fifo::part_header* header,
2030 lr::AioCompletion* c)
2031{
2032 std::unique_lock l(m);
2033 const auto part_oid = info.part_oid(part_num);
2034 auto tid = ++next_tid;
2035 l.unlock();
2036 auto op = rgw::cls::fifo::get_part_info(cct, header, tid);
2037 auto r = ioctx.aio_operate(part_oid, c, &op, nullptr);
2038 ceph_assert(r >= 0);
2039}
2040
2041struct InfoGetter : Completion<InfoGetter> {
2042 FIFO* fifo;
2043 fifo::part_header header;
2044 fu2::function<void(int r, fifo::part_header&&)> f;
2045 std::uint64_t tid;
2046 bool headerread = false;
2047
b3b6e05e 2048 InfoGetter(const DoutPrefixProvider *dpp, FIFO* fifo, fu2::function<void(int r, fifo::part_header&&)> f,
f67539c2 2049 std::uint64_t tid, lr::AioCompletion* super)
b3b6e05e
TL
2050 : Completion(dpp, super), fifo(fifo), f(std::move(f)), tid(tid) {}
2051 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
2052 if (!headerread) {
2053 if (r < 0) {
b3b6e05e 2054 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2055 << " read_meta failed: r="
2056 << r << " tid=" << tid << dendl;
2057 if (f)
2058 f(r, {});
2059 complete(std::move(p), r);
2060 return;
2061 }
2062
2063 auto info = fifo->meta();
2064 auto hpn = info.head_part_num;
2065 if (hpn < 0) {
b3b6e05e 2066 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2067 << " no head, returning empty partinfo r="
2068 << r << " tid=" << tid << dendl;
2069 if (f)
2070 f(0, {});
2071 complete(std::move(p), r);
2072 return;
2073 }
2074 headerread = true;
2075 auto op = rgw::cls::fifo::get_part_info(fifo->cct, &header, tid);
2076 std::unique_lock l(fifo->m);
2077 auto oid = fifo->info.part_oid(hpn);
2078 l.unlock();
2079 r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op,
2080 nullptr);
2081 ceph_assert(r >= 0);
2082 return;
2083 }
2084
2085 if (r < 0) {
b3b6e05e 2086 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2087 << " get_part_info failed: r="
2088 << r << " tid=" << tid << dendl;
2089 }
2090
2091 if (f)
2092 f(r, std::move(header));
2093 complete(std::move(p), r);
2094 return;
2095 }
2096};
2097
b3b6e05e 2098void FIFO::get_head_info(const DoutPrefixProvider *dpp, fu2::unique_function<void(int r,
f67539c2
TL
2099 fifo::part_header&&)> f,
2100 lr::AioCompletion* c)
2101{
2102 std::unique_lock l(m);
2103 auto tid = ++next_tid;
2104 l.unlock();
b3b6e05e
TL
2105 auto ig = std::make_unique<InfoGetter>(dpp, this, std::move(f), tid, c);
2106 read_meta(dpp, tid, InfoGetter::call(std::move(ig)));
f67539c2
TL
2107}
2108
2109struct JournalProcessor : public Completion<JournalProcessor> {
2110private:
2111 FIFO* const fifo;
2112
2113 std::vector<fifo::journal_entry> processed;
39ae355f
TL
2114 decltype(fifo->info.journal) journal;
2115 decltype(journal)::iterator iter;
f67539c2
TL
2116 std::int64_t new_tail;
2117 std::int64_t new_head;
2118 std::int64_t new_max;
2119 int race_retries = 0;
2120 bool first_pp = true;
2121 bool canceled = false;
2122 std::uint64_t tid;
2123
2124 enum {
2125 entry_callback,
2126 pp_callback,
2127 } state;
2128
39ae355f 2129 void create_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num) {
20effc67 2130 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2131 << " entering: tid=" << tid << dendl;
2132 state = entry_callback;
2133 lr::ObjectWriteOperation op;
2134 op.create(false); /* We don't need exclusivity, part_init ensures
2135 we're creating from the same journal entry. */
2136 std::unique_lock l(fifo->m);
39ae355f 2137 part_init(&op, fifo->info.params);
f67539c2
TL
2138 auto oid = fifo->info.part_oid(part_num);
2139 l.unlock();
2140 auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op);
2141 ceph_assert(r >= 0);
2142 return;
2143 }
2144
39ae355f 2145 void remove_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num) {
20effc67 2146 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2147 << " entering: tid=" << tid << dendl;
2148 state = entry_callback;
2149 lr::ObjectWriteOperation op;
2150 op.remove();
2151 std::unique_lock l(fifo->m);
2152 auto oid = fifo->info.part_oid(part_num);
2153 l.unlock();
2154 auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op);
2155 ceph_assert(r >= 0);
2156 return;
2157 }
2158
b3b6e05e 2159 void finish_je(const DoutPrefixProvider *dpp, Ptr&& p, int r,
f67539c2 2160 const fifo::journal_entry& entry) {
b3b6e05e 2161 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2162 << " entering: tid=" << tid << dendl;
2163
b3b6e05e 2164 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2165 << " finishing entry: entry=" << entry
2166 << " tid=" << tid << dendl;
2167
1e59de90
TL
2168 using enum fifo::journal_entry::Op;
2169 if (entry.op == remove && r == -ENOENT)
f67539c2
TL
2170 r = 0;
2171
2172 if (r < 0) {
b3b6e05e 2173 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2174 << " processing entry failed: entry=" << entry
2175 << " r=" << r << " tid=" << tid << dendl;
2176 complete(std::move(p), r);
2177 return;
2178 } else {
2179 switch (entry.op) {
1e59de90
TL
2180 case unknown:
2181 case set_head:
f67539c2
TL
2182 // Can't happen. Filtered out in process.
2183 complete(std::move(p), -EIO);
2184 return;
2185
1e59de90 2186 case create:
f67539c2
TL
2187 if (entry.part_num > new_max) {
2188 new_max = entry.part_num;
2189 }
2190 break;
1e59de90 2191 case remove:
f67539c2
TL
2192 if (entry.part_num >= new_tail) {
2193 new_tail = entry.part_num + 1;
2194 }
2195 break;
2196 }
2197 processed.push_back(entry);
2198 }
2199 ++iter;
b3b6e05e 2200 process(dpp, std::move(p));
f67539c2
TL
2201 }
2202
b3b6e05e
TL
2203 void postprocess(const DoutPrefixProvider *dpp, Ptr&& p) {
2204 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2205 << " entering: tid=" << tid << dendl;
2206 if (processed.empty()) {
b3b6e05e 2207 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2208 << " nothing to update any more: race_retries="
2209 << race_retries << " tid=" << tid << dendl;
2210 complete(std::move(p), 0);
2211 return;
2212 }
b3b6e05e 2213 pp_run(dpp, std::move(p), 0, false);
f67539c2
TL
2214 }
2215
2216public:
2217
b3b6e05e
TL
2218 JournalProcessor(const DoutPrefixProvider *dpp, FIFO* fifo, std::uint64_t tid, lr::AioCompletion* super)
2219 : Completion(dpp, super), fifo(fifo), tid(tid) {
f67539c2
TL
2220 std::unique_lock l(fifo->m);
2221 journal = fifo->info.journal;
2222 iter = journal.begin();
2223 new_tail = fifo->info.tail_part_num;
2224 new_head = fifo->info.head_part_num;
2225 new_max = fifo->info.max_push_part_num;
2226 }
2227
b3b6e05e
TL
2228 void pp_run(const DoutPrefixProvider *dpp, Ptr&& p, int r, bool canceled) {
2229 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2230 << " entering: tid=" << tid << dendl;
2231 std::optional<int64_t> tail_part_num;
2232 std::optional<int64_t> head_part_num;
2233 std::optional<int64_t> max_part_num;
2234
2235 if (r < 0) {
b3b6e05e 2236 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2237 << " failed, r=: " << r << " tid=" << tid << dendl;
2238 complete(std::move(p), r);
2239 }
2240
2241
b3b6e05e 2242 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2243 << " postprocessing: race_retries="
2244 << race_retries << " tid=" << tid << dendl;
2245
2246 if (!first_pp && r == 0 && !canceled) {
b3b6e05e 2247 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2248 << " nothing to update any more: race_retries="
2249 << race_retries << " tid=" << tid << dendl;
2250 complete(std::move(p), 0);
2251 return;
2252 }
2253
2254 first_pp = false;
2255
2256 if (canceled) {
2257 if (race_retries >= MAX_RACE_RETRIES) {
b3b6e05e 2258 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2259 << " canceled too many times, giving up: tid="
2260 << tid << dendl;
2261 complete(std::move(p), -ECANCELED);
2262 return;
2263 }
b3b6e05e 2264 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2265 << " update canceled, retrying: race_retries="
2266 << race_retries << " tid=" << tid << dendl;
2267
2268 ++race_retries;
2269
2270 std::vector<fifo::journal_entry> new_processed;
2271 std::unique_lock l(fifo->m);
2272 for (auto& e : processed) {
39ae355f
TL
2273 if (fifo->info.journal.contains(e)) {
2274 new_processed.push_back(e);
f67539c2 2275 }
f67539c2
TL
2276 }
2277 processed = std::move(new_processed);
2278 }
2279
2280 std::unique_lock l(fifo->m);
2281 auto objv = fifo->info.version;
2282 if (new_tail > fifo->info.tail_part_num) {
2283 tail_part_num = new_tail;
2284 }
2285
2286 if (new_head > fifo->info.head_part_num) {
2287 head_part_num = new_head;
2288 }
2289
2290 if (new_max > fifo->info.max_push_part_num) {
2291 max_part_num = new_max;
2292 }
2293 l.unlock();
2294
2295 if (processed.empty() &&
2296 !tail_part_num &&
2297 !max_part_num) {
2298 /* nothing to update anymore */
b3b6e05e 2299 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2300 << " nothing to update any more: race_retries="
2301 << race_retries << " tid=" << tid << dendl;
2302 complete(std::move(p), 0);
2303 return;
2304 }
2305 state = pp_callback;
b3b6e05e 2306 fifo->_update_meta(dpp, fifo::update{}
f67539c2
TL
2307 .tail_part_num(tail_part_num)
2308 .head_part_num(head_part_num)
2309 .max_push_part_num(max_part_num)
2310 .journal_entries_rm(processed),
2311 objv, &this->canceled, tid, call(std::move(p)));
2312 return;
2313 }
2314
2315 JournalProcessor(const JournalProcessor&) = delete;
2316 JournalProcessor& operator =(const JournalProcessor&) = delete;
2317 JournalProcessor(JournalProcessor&&) = delete;
2318 JournalProcessor& operator =(JournalProcessor&&) = delete;
2319
b3b6e05e
TL
2320 void process(const DoutPrefixProvider *dpp, Ptr&& p) {
2321 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2322 << " entering: tid=" << tid << dendl;
2323 while (iter != journal.end()) {
b3b6e05e 2324 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2325 << " processing entry: entry=" << *iter
2326 << " tid=" << tid << dendl;
39ae355f 2327 const auto entry = *iter;
f67539c2 2328 switch (entry.op) {
1e59de90
TL
2329 using enum fifo::journal_entry::Op;
2330 case create:
39ae355f 2331 create_part(dpp, std::move(p), entry.part_num);
f67539c2 2332 return;
1e59de90 2333 case set_head:
f67539c2
TL
2334 if (entry.part_num > new_head) {
2335 new_head = entry.part_num;
2336 }
2337 processed.push_back(entry);
2338 ++iter;
2339 continue;
1e59de90 2340 case remove:
39ae355f 2341 remove_part(dpp, std::move(p), entry.part_num);
f67539c2
TL
2342 return;
2343 default:
20effc67 2344 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2345 << " unknown journaled op: entry=" << entry << " tid="
2346 << tid << dendl;
2347 complete(std::move(p), -EIO);
2348 return;
2349 }
2350 }
b3b6e05e 2351 postprocess(dpp, std::move(p));
f67539c2
TL
2352 return;
2353 }
2354
b3b6e05e
TL
2355 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
2356 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2357 << " entering: tid=" << tid << dendl;
2358 switch (state) {
2359 case entry_callback:
39ae355f 2360 finish_je(dpp, std::move(p), r, *iter);
f67539c2
TL
2361 return;
2362 case pp_callback:
2363 auto c = canceled;
2364 canceled = false;
b3b6e05e 2365 pp_run(dpp, std::move(p), r, c);
f67539c2
TL
2366 return;
2367 }
2368
2369 abort();
2370 }
2371
2372};
2373
b3b6e05e
TL
2374void FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c) {
2375 auto p = std::make_unique<JournalProcessor>(dpp, this, tid, c);
2376 p->process(dpp, std::move(p));
f67539c2
TL
2377}
2378
2379struct Lister : Completion<Lister> {
2380 FIFO* f;
2381 std::vector<list_entry> result;
2382 bool more = false;
2383 std::int64_t part_num;
2384 std::uint64_t ofs;
2385 int max_entries;
2386 int r_out = 0;
2387 std::vector<fifo::part_list_entry> entries;
2388 bool part_more = false;
2389 bool part_full = false;
2390 std::vector<list_entry>* entries_out;
2391 bool* more_out;
2392 std::uint64_t tid;
2393
2394 bool read = false;
2395
2396 void complete(Ptr&& p, int r) {
2397 if (r >= 0) {
2398 if (more_out) *more_out = more;
2399 if (entries_out) *entries_out = std::move(result);
2400 }
2401 Completion::complete(std::move(p), r);
2402 }
2403
2404public:
b3b6e05e 2405 Lister(const DoutPrefixProvider *dpp, FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries,
f67539c2
TL
2406 std::vector<list_entry>* entries_out, bool* more_out,
2407 std::uint64_t tid, lr::AioCompletion* super)
b3b6e05e 2408 : Completion(dpp, super), f(f), part_num(part_num), ofs(ofs), max_entries(max_entries),
f67539c2
TL
2409 entries_out(entries_out), more_out(more_out), tid(tid) {
2410 result.reserve(max_entries);
2411 }
2412
2413 Lister(const Lister&) = delete;
2414 Lister& operator =(const Lister&) = delete;
2415 Lister(Lister&&) = delete;
2416 Lister& operator =(Lister&&) = delete;
2417
b3b6e05e 2418 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
2419 if (read)
2420 handle_read(std::move(p), r);
2421 else
b3b6e05e 2422 handle_list(dpp, std::move(p), r);
f67539c2
TL
2423 }
2424
2425 void list(Ptr&& p) {
2426 if (max_entries > 0) {
2427 part_more = false;
2428 part_full = false;
2429 entries.clear();
2430
2431 std::unique_lock l(f->m);
2432 auto part_oid = f->info.part_oid(part_num);
2433 l.unlock();
2434
2435 read = false;
39ae355f
TL
2436 auto op = list_part(f->cct, ofs, max_entries, &r_out,
2437 &entries, &part_more, &part_full, tid);
f67539c2
TL
2438 f->ioctx.aio_operate(part_oid, call(std::move(p)), &op, nullptr);
2439 } else {
2440 complete(std::move(p), 0);
2441 }
2442 }
2443
2444 void handle_read(Ptr&& p, int r) {
2445 read = false;
2446 if (r >= 0) r = r_out;
2447 r_out = 0;
2448
2449 if (r < 0) {
2450 complete(std::move(p), r);
2451 return;
2452 }
2453
2454 if (part_num < f->info.tail_part_num) {
2455 /* raced with trim? restart */
2456 max_entries += result.size();
2457 result.clear();
2458 part_num = f->info.tail_part_num;
2459 ofs = 0;
2460 list(std::move(p));
2461 return;
2462 }
2463 /* assuming part was not written yet, so end of data */
2464 more = false;
2465 complete(std::move(p), 0);
2466 return;
2467 }
2468
b3b6e05e 2469 void handle_list(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
2470 if (r >= 0) r = r_out;
2471 r_out = 0;
2472 std::unique_lock l(f->m);
2473 auto part_oid = f->info.part_oid(part_num);
2474 l.unlock();
2475 if (r == -ENOENT) {
2476 read = true;
b3b6e05e 2477 f->read_meta(dpp, tid, call(std::move(p)));
f67539c2
TL
2478 return;
2479 }
2480 if (r < 0) {
2481 complete(std::move(p), r);
2482 return;
2483 }
2484
2485 more = part_full || part_more;
2486 for (auto& entry : entries) {
2487 list_entry e;
2488 e.data = std::move(entry.data);
2489 e.marker = marker{part_num, entry.ofs}.to_string();
2490 e.mtime = entry.mtime;
2491 result.push_back(std::move(e));
2492 }
2493 max_entries -= entries.size();
2494 entries.clear();
2495 if (max_entries > 0 && part_more) {
2496 list(std::move(p));
2497 return;
2498 }
2499
2500 if (!part_full) { /* head part is not full */
2501 complete(std::move(p), 0);
2502 return;
2503 }
2504 ++part_num;
2505 ofs = 0;
2506 list(std::move(p));
2507 }
2508};
2509
b3b6e05e 2510void FIFO::list(const DoutPrefixProvider *dpp, int max_entries,
f67539c2
TL
2511 std::optional<std::string_view> markstr,
2512 std::vector<list_entry>* out,
2513 bool* more,
2514 lr::AioCompletion* c) {
2515 std::unique_lock l(m);
2516 auto tid = ++next_tid;
2517 std::int64_t part_num = info.tail_part_num;
2518 l.unlock();
2519 std::uint64_t ofs = 0;
2520 std::optional<::rgw::cls::fifo::marker> marker;
2521
2522 if (markstr) {
2523 marker = to_marker(*markstr);
2524 if (marker) {
2525 part_num = marker->num;
2526 ofs = marker->ofs;
2527 }
2528 }
2529
b3b6e05e 2530 auto ls = std::make_unique<Lister>(dpp, this, part_num, ofs, max_entries, out,
f67539c2
TL
2531 more, tid, c);
2532 if (markstr && !marker) {
2533 auto l = ls.get();
2534 l->complete(std::move(ls), -EINVAL);
2535 } else {
2536 ls->list(std::move(ls));
2537 }
2538}
2539}