]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/cls_fifo_legacy.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / rgw / cls_fifo_legacy.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2020 Red Hat <contact@redhat.com>
7 * Author: Adam C. Emerson
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
39ae355f 16#include <algorithm>
f67539c2
TL
17#include <cstdint>
18#include <numeric>
19#include <optional>
20#include <string_view>
21
22#undef FMT_HEADER_ONLY
23#define FMT_HEADER_ONLY 1
24#include <fmt/format.h>
25
26#include "include/rados/librados.hpp"
27
28#include "include/buffer.h"
29
30#include "common/async/yield_context.h"
31#include "common/random_string.h"
32
33#include "cls/fifo/cls_fifo_types.h"
34#include "cls/fifo/cls_fifo_ops.h"
35
36#include "cls_fifo_legacy.h"
37
38namespace rgw::cls::fifo {
f67539c2
TL
39namespace cb = ceph::buffer;
40namespace fifo = rados::cls::fifo;
41
42using ceph::from_error_code;
43
44inline constexpr auto MAX_RACE_RETRIES = 10;
45
46void create_meta(lr::ObjectWriteOperation* op,
47 std::string_view id,
48 std::optional<fifo::objv> objv,
49 std::optional<std::string_view> oid_prefix,
50 bool exclusive,
51 std::uint64_t max_part_size,
52 std::uint64_t max_entry_size)
53{
54 fifo::op::create_meta cm;
55
56 cm.id = id;
57 cm.version = objv;
58 cm.oid_prefix = oid_prefix;
59 cm.max_part_size = max_part_size;
60 cm.max_entry_size = max_entry_size;
61 cm.exclusive = exclusive;
62
63 cb::list in;
64 encode(cm, in);
65 op->exec(fifo::op::CLASS, fifo::op::CREATE_META, in);
66}
67
b3b6e05e 68int get_meta(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
f67539c2
TL
69 std::optional<fifo::objv> objv, fifo::info* info,
70 std::uint32_t* part_header_size,
71 std::uint32_t* part_entry_overhead,
72 uint64_t tid, optional_yield y,
73 bool probe)
74{
75 lr::ObjectReadOperation op;
76 fifo::op::get_meta gm;
77 gm.version = objv;
78 cb::list in;
79 encode(gm, in);
80 cb::list bl;
81
82 op.exec(fifo::op::CLASS, fifo::op::GET_META, in,
83 &bl, nullptr);
b3b6e05e 84 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
f67539c2
TL
85 if (r >= 0) try {
86 fifo::op::get_meta_reply reply;
87 auto iter = bl.cbegin();
88 decode(reply, iter);
89 if (info) *info = std::move(reply.info);
90 if (part_header_size) *part_header_size = reply.part_header_size;
91 if (part_entry_overhead)
92 *part_entry_overhead = reply.part_entry_overhead;
93 } catch (const cb::error& err) {
b3b6e05e 94 ldpp_dout(dpp, -1)
f67539c2
TL
95 << __PRETTY_FUNCTION__ << ":" << __LINE__
96 << " decode failed: " << err.what()
97 << " tid=" << tid << dendl;
98 r = from_error_code(err.code());
99 } else if (!(probe && (r == -ENOENT || r == -ENODATA))) {
b3b6e05e 100 ldpp_dout(dpp, -1)
f67539c2
TL
101 << __PRETTY_FUNCTION__ << ":" << __LINE__
102 << " fifo::op::GET_META failed r=" << r << " tid=" << tid
103 << dendl;
104 }
105 return r;
106};
107
108namespace {
109void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv,
110 const fifo::update& update)
111{
112 fifo::op::update_meta um;
113
114 um.version = objv;
115 um.tail_part_num = update.tail_part_num();
116 um.head_part_num = update.head_part_num();
117 um.min_push_part_num = update.min_push_part_num();
118 um.max_push_part_num = update.max_push_part_num();
119 um.journal_entries_add = std::move(update).journal_entries_add();
120 um.journal_entries_rm = std::move(update).journal_entries_rm();
121
122 cb::list in;
123 encode(um, in);
124 op->exec(fifo::op::CLASS, fifo::op::UPDATE_META, in);
125}
126
39ae355f 127void part_init(lr::ObjectWriteOperation* op, fifo::data_params params)
f67539c2
TL
128{
129 fifo::op::init_part ip;
130
f67539c2
TL
131 ip.params = params;
132
133 cb::list in;
134 encode(ip, in);
135 op->exec(fifo::op::CLASS, fifo::op::INIT_PART, in);
136}
137
39ae355f 138int push_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
f67539c2
TL
139 std::deque<cb::list> data_bufs, std::uint64_t tid,
140 optional_yield y)
141{
142 lr::ObjectWriteOperation op;
143 fifo::op::push_part pp;
144
39ae355f
TL
145 op.assert_exists();
146
f67539c2
TL
147 pp.data_bufs = data_bufs;
148 pp.total_len = 0;
149
150 for (const auto& bl : data_bufs)
151 pp.total_len += bl.length();
152
153 cb::list in;
154 encode(pp, in);
155 auto retval = 0;
156 op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in, nullptr, &retval);
b3b6e05e 157 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y, lr::OPERATION_RETURNVEC);
f67539c2 158 if (r < 0) {
b3b6e05e 159 ldpp_dout(dpp, -1)
f67539c2
TL
160 << __PRETTY_FUNCTION__ << ":" << __LINE__
161 << " fifo::op::PUSH_PART failed r=" << r
162 << " tid=" << tid << dendl;
163 return r;
164 }
165 if (retval < 0) {
b3b6e05e 166 ldpp_dout(dpp, -1)
f67539c2
TL
167 << __PRETTY_FUNCTION__ << ":" << __LINE__
168 << " error handling response retval=" << retval
169 << " tid=" << tid << dendl;
170 }
171 return retval;
172}
173
39ae355f 174void push_part(lr::IoCtx& ioctx, const std::string& oid,
f67539c2
TL
175 std::deque<cb::list> data_bufs, std::uint64_t tid,
176 lr::AioCompletion* c)
177{
178 lr::ObjectWriteOperation op;
179 fifo::op::push_part pp;
180
f67539c2
TL
181 pp.data_bufs = data_bufs;
182 pp.total_len = 0;
183
184 for (const auto& bl : data_bufs)
185 pp.total_len += bl.length();
186
187 cb::list in;
188 encode(pp, in);
189 op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in);
190 auto r = ioctx.aio_operate(oid, c, &op, lr::OPERATION_RETURNVEC);
191 ceph_assert(r >= 0);
192}
193
194void trim_part(lr::ObjectWriteOperation* op,
f67539c2
TL
195 std::uint64_t ofs, bool exclusive)
196{
197 fifo::op::trim_part tp;
198
f67539c2
TL
199 tp.ofs = ofs;
200 tp.exclusive = exclusive;
201
202 cb::list in;
203 encode(tp, in);
204 op->exec(fifo::op::CLASS, fifo::op::TRIM_PART, in);
205}
206
b3b6e05e 207int list_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
39ae355f 208 std::uint64_t ofs, std::uint64_t max_entries,
f67539c2 209 std::vector<fifo::part_list_entry>* entries,
39ae355f 210 bool* more, bool* full_part,
f67539c2
TL
211 std::uint64_t tid, optional_yield y)
212{
213 lr::ObjectReadOperation op;
214 fifo::op::list_part lp;
215
f67539c2
TL
216 lp.ofs = ofs;
217 lp.max_entries = max_entries;
218
219 cb::list in;
220 encode(lp, in);
221 cb::list bl;
222 op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, &bl, nullptr);
b3b6e05e 223 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
f67539c2
TL
224 if (r >= 0) try {
225 fifo::op::list_part_reply reply;
226 auto iter = bl.cbegin();
227 decode(reply, iter);
228 if (entries) *entries = std::move(reply.entries);
229 if (more) *more = reply.more;
230 if (full_part) *full_part = reply.full_part;
f67539c2 231 } catch (const cb::error& err) {
b3b6e05e 232 ldpp_dout(dpp, -1)
f67539c2
TL
233 << __PRETTY_FUNCTION__ << ":" << __LINE__
234 << " decode failed: " << err.what()
235 << " tid=" << tid << dendl;
236 r = from_error_code(err.code());
237 } else if (r != -ENOENT) {
b3b6e05e 238 ldpp_dout(dpp, -1)
f67539c2
TL
239 << __PRETTY_FUNCTION__ << ":" << __LINE__
240 << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid
241 << dendl;
242 }
243 return r;
244}
245
246struct list_entry_completion : public lr::ObjectOperationCompletion {
247 CephContext* cct;
248 int* r_out;
249 std::vector<fifo::part_list_entry>* entries;
250 bool* more;
251 bool* full_part;
f67539c2
TL
252 std::uint64_t tid;
253
254 list_entry_completion(CephContext* cct, int* r_out, std::vector<fifo::part_list_entry>* entries,
39ae355f 255 bool* more, bool* full_part, std::uint64_t tid)
f67539c2 256 : cct(cct), r_out(r_out), entries(entries), more(more),
39ae355f 257 full_part(full_part), tid(tid) {}
f67539c2
TL
258 virtual ~list_entry_completion() = default;
259 void handle_completion(int r, bufferlist& bl) override {
260 if (r >= 0) try {
261 fifo::op::list_part_reply reply;
262 auto iter = bl.cbegin();
263 decode(reply, iter);
264 if (entries) *entries = std::move(reply.entries);
265 if (more) *more = reply.more;
266 if (full_part) *full_part = reply.full_part;
f67539c2
TL
267 } catch (const cb::error& err) {
268 lderr(cct)
269 << __PRETTY_FUNCTION__ << ":" << __LINE__
270 << " decode failed: " << err.what()
271 << " tid=" << tid << dendl;
272 r = from_error_code(err.code());
273 } else if (r < 0) {
274 lderr(cct)
275 << __PRETTY_FUNCTION__ << ":" << __LINE__
276 << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid
277 << dendl;
278 }
279 if (r_out) *r_out = r;
280 }
281};
282
283lr::ObjectReadOperation list_part(CephContext* cct,
f67539c2
TL
284 std::uint64_t ofs,
285 std::uint64_t max_entries,
286 int* r_out,
287 std::vector<fifo::part_list_entry>* entries,
288 bool* more, bool* full_part,
39ae355f 289 std::uint64_t tid)
f67539c2
TL
290{
291 lr::ObjectReadOperation op;
292 fifo::op::list_part lp;
293
f67539c2
TL
294 lp.ofs = ofs;
295 lp.max_entries = max_entries;
296
297 cb::list in;
298 encode(lp, in);
299 op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in,
300 new list_entry_completion(cct, r_out, entries, more, full_part,
39ae355f 301 tid));
f67539c2
TL
302 return op;
303}
304
b3b6e05e 305int get_part_info(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
f67539c2
TL
306 fifo::part_header* header,
307 std::uint64_t tid, optional_yield y)
308{
309 lr::ObjectReadOperation op;
310 fifo::op::get_part_info gpi;
311
312 cb::list in;
313 cb::list bl;
314 encode(gpi, in);
315 op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, &bl, nullptr);
b3b6e05e 316 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
f67539c2
TL
317 if (r >= 0) try {
318 fifo::op::get_part_info_reply reply;
319 auto iter = bl.cbegin();
320 decode(reply, iter);
321 if (header) *header = std::move(reply.header);
322 } catch (const cb::error& err) {
b3b6e05e 323 ldpp_dout(dpp, -1)
f67539c2
TL
324 << __PRETTY_FUNCTION__ << ":" << __LINE__
325 << " decode failed: " << err.what()
326 << " tid=" << tid << dendl;
327 r = from_error_code(err.code());
328 } else {
b3b6e05e 329 ldpp_dout(dpp, -1)
f67539c2
TL
330 << __PRETTY_FUNCTION__ << ":" << __LINE__
331 << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid
332 << dendl;
333 }
334 return r;
335}
336
337struct partinfo_completion : public lr::ObjectOperationCompletion {
338 CephContext* cct;
339 int* rp;
340 fifo::part_header* h;
341 std::uint64_t tid;
342 partinfo_completion(CephContext* cct, int* rp, fifo::part_header* h,
343 std::uint64_t tid) :
344 cct(cct), rp(rp), h(h), tid(tid) {
345 }
346 virtual ~partinfo_completion() = default;
347 void handle_completion(int r, bufferlist& bl) override {
348 if (r >= 0) try {
349 fifo::op::get_part_info_reply reply;
350 auto iter = bl.cbegin();
351 decode(reply, iter);
352 if (h) *h = std::move(reply.header);
353 } catch (const cb::error& err) {
354 r = from_error_code(err.code());
355 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
356 << " decode failed: " << err.what()
357 << " tid=" << tid << dendl;
358 } else {
359 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
360 << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid
361 << dendl;
362 }
363 if (rp) {
364 *rp = r;
365 }
366 }
367};
368
369lr::ObjectReadOperation get_part_info(CephContext* cct,
370 fifo::part_header* header,
371 std::uint64_t tid, int* r = 0)
372{
373 lr::ObjectReadOperation op;
374 fifo::op::get_part_info gpi;
375
376 cb::list in;
377 cb::list bl;
378 encode(gpi, in);
379 op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in,
380 new partinfo_completion(cct, r, header, tid));
381 return op;
382}
383}
384
385std::optional<marker> FIFO::to_marker(std::string_view s)
386{
387 marker m;
388 if (s.empty()) {
389 m.num = info.tail_part_num;
390 m.ofs = 0;
391 return m;
392 }
393
394 auto pos = s.find(':');
20effc67 395 if (pos == s.npos) {
f67539c2
TL
396 return std::nullopt;
397 }
398
399 auto num = s.substr(0, pos);
400 auto ofs = s.substr(pos + 1);
401
402 auto n = ceph::parse<decltype(m.num)>(num);
403 if (!n) {
404 return std::nullopt;
405 }
406 m.num = *n;
407 auto o = ceph::parse<decltype(m.ofs)>(ofs);
408 if (!o) {
409 return std::nullopt;
410 }
411 m.ofs = *o;
412 return m;
413}
414
20effc67
TL
415int FIFO::apply_update(const DoutPrefixProvider *dpp,
416 fifo::info* info,
f67539c2
TL
417 const fifo::objv& objv,
418 const fifo::update& update,
419 std::uint64_t tid)
420{
20effc67 421 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
422 << " entering: tid=" << tid << dendl;
423 std::unique_lock l(m);
424 if (objv != info->version) {
20effc67 425 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
39ae355f 426 << " version mismatch, canceling: tid=" << tid << dendl;
f67539c2
TL
427 return -ECANCELED;
428 }
429
39ae355f 430 info->apply_update(update);
f67539c2
TL
431 return {};
432}
433
b3b6e05e 434int FIFO::_update_meta(const DoutPrefixProvider *dpp, const fifo::update& update,
f67539c2
TL
435 fifo::objv version, bool* pcanceled,
436 std::uint64_t tid, optional_yield y)
437{
b3b6e05e 438 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
439 << " entering: tid=" << tid << dendl;
440 lr::ObjectWriteOperation op;
441 bool canceled = false;
39ae355f 442 update_meta(&op, version, update);
b3b6e05e 443 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
f67539c2
TL
444 if (r >= 0 || r == -ECANCELED) {
445 canceled = (r == -ECANCELED);
446 if (!canceled) {
20effc67 447 r = apply_update(dpp, &info, version, update, tid);
f67539c2
TL
448 if (r < 0) canceled = true;
449 }
450 if (canceled) {
b3b6e05e 451 r = read_meta(dpp, tid, y);
f67539c2
TL
452 canceled = r < 0 ? false : true;
453 }
454 }
455 if (pcanceled) *pcanceled = canceled;
456 if (canceled) {
b3b6e05e 457 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
458 << " canceled: tid=" << tid << dendl;
459 }
460 if (r < 0) {
b3b6e05e 461 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
462 << " returning error: r=" << r << " tid=" << tid << dendl;
463 }
464 return r;
465}
466
467struct Updater : public Completion<Updater> {
468 FIFO* fifo;
469 fifo::update update;
470 fifo::objv version;
471 bool reread = false;
472 bool* pcanceled = nullptr;
473 std::uint64_t tid;
b3b6e05e 474 Updater(const DoutPrefixProvider *dpp, FIFO* fifo, lr::AioCompletion* super,
f67539c2
TL
475 const fifo::update& update, fifo::objv version,
476 bool* pcanceled, std::uint64_t tid)
b3b6e05e 477 : Completion(dpp, super), fifo(fifo), update(update), version(version),
f67539c2
TL
478 pcanceled(pcanceled) {}
479
b3b6e05e
TL
480 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
481 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
482 << " entering: tid=" << tid << dendl;
483 if (reread)
20effc67 484 handle_reread(dpp, std::move(p), r);
f67539c2 485 else
b3b6e05e 486 handle_update(dpp, std::move(p), r);
f67539c2
TL
487 }
488
b3b6e05e
TL
489 void handle_update(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
490 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
491 << " handling async update_meta: tid="
492 << tid << dendl;
493 if (r < 0 && r != -ECANCELED) {
b3b6e05e 494 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
495 << " update failed: r=" << r << " tid=" << tid << dendl;
496 complete(std::move(p), r);
497 return;
498 }
499 bool canceled = (r == -ECANCELED);
500 if (!canceled) {
20effc67 501 int r = fifo->apply_update(dpp, &fifo->info, version, update, tid);
f67539c2 502 if (r < 0) {
b3b6e05e 503 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
504 << " update failed, marking canceled: r=" << r
505 << " tid=" << tid << dendl;
506 canceled = true;
507 }
508 }
509 if (canceled) {
510 reread = true;
b3b6e05e 511 fifo->read_meta(dpp, tid, call(std::move(p)));
f67539c2
TL
512 return;
513 }
514 if (pcanceled)
515 *pcanceled = false;
b3b6e05e 516 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
517 << " completing: tid=" << tid << dendl;
518 complete(std::move(p), 0);
519 }
520
20effc67
TL
521 void handle_reread(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
522 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
523 << " handling async read_meta: tid="
524 << tid << dendl;
525 if (r < 0 && pcanceled) {
526 *pcanceled = false;
527 } else if (r >= 0 && pcanceled) {
528 *pcanceled = true;
529 }
530 if (r < 0) {
20effc67 531 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
532 << " failed dispatching read_meta: r=" << r << " tid="
533 << tid << dendl;
534 } else {
20effc67 535 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
536 << " completing: tid=" << tid << dendl;
537 }
538 complete(std::move(p), r);
539 }
540};
541
b3b6e05e 542void FIFO::_update_meta(const DoutPrefixProvider *dpp, const fifo::update& update,
f67539c2
TL
543 fifo::objv version, bool* pcanceled,
544 std::uint64_t tid, lr::AioCompletion* c)
545{
b3b6e05e 546 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
547 << " entering: tid=" << tid << dendl;
548 lr::ObjectWriteOperation op;
549 update_meta(&op, info.version, update);
b3b6e05e 550 auto updater = std::make_unique<Updater>(dpp, this, c, update, version, pcanceled,
f67539c2
TL
551 tid);
552 auto r = ioctx.aio_operate(oid, Updater::call(std::move(updater)), &op);
553 assert(r >= 0);
554}
555
39ae355f 556int FIFO::create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid,
f67539c2
TL
557 optional_yield y)
558{
b3b6e05e 559 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
560 << " entering: tid=" << tid << dendl;
561 lr::ObjectWriteOperation op;
562 op.create(false); /* We don't need exclusivity, part_init ensures
563 we're creating from the same journal entry. */
564 std::unique_lock l(m);
39ae355f 565 part_init(&op, info.params);
f67539c2
TL
566 auto oid = info.part_oid(part_num);
567 l.unlock();
b3b6e05e 568 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
f67539c2 569 if (r < 0) {
b3b6e05e 570 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
571 << " part_init failed: r=" << r << " tid="
572 << tid << dendl;
573 }
574 return r;
575}
576
39ae355f 577int FIFO::remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid,
f67539c2
TL
578 optional_yield y)
579{
b3b6e05e 580 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
581 << " entering: tid=" << tid << dendl;
582 lr::ObjectWriteOperation op;
583 op.remove();
584 std::unique_lock l(m);
585 auto oid = info.part_oid(part_num);
586 l.unlock();
b3b6e05e 587 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
f67539c2 588 if (r < 0) {
b3b6e05e 589 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
590 << " remove failed: r=" << r << " tid="
591 << tid << dendl;
592 }
593 return r;
594}
595
b3b6e05e 596int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y)
f67539c2 597{
b3b6e05e 598 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
599 << " entering: tid=" << tid << dendl;
600 std::vector<fifo::journal_entry> processed;
601
602 std::unique_lock l(m);
603 auto tmpjournal = info.journal;
604 auto new_tail = info.tail_part_num;
605 auto new_head = info.head_part_num;
606 auto new_max = info.max_push_part_num;
607 l.unlock();
608
609 int r = 0;
39ae355f 610 for (auto& entry : tmpjournal) {
b3b6e05e 611 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
612 << " processing entry: entry=" << entry << " tid=" << tid
613 << dendl;
614 switch (entry.op) {
615 case fifo::journal_entry::Op::create:
39ae355f 616 r = create_part(dpp, entry.part_num, tid, y);
f67539c2
TL
617 if (entry.part_num > new_max) {
618 new_max = entry.part_num;
619 }
620 break;
621 case fifo::journal_entry::Op::set_head:
622 r = 0;
623 if (entry.part_num > new_head) {
624 new_head = entry.part_num;
625 }
626 break;
627 case fifo::journal_entry::Op::remove:
39ae355f 628 r = remove_part(dpp, entry.part_num, tid, y);
f67539c2
TL
629 if (r == -ENOENT) r = 0;
630 if (entry.part_num >= new_tail) {
631 new_tail = entry.part_num + 1;
632 }
633 break;
634 default:
b3b6e05e 635 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
636 << " unknown journaled op: entry=" << entry << " tid="
637 << tid << dendl;
638 return -EIO;
639 }
640
641 if (r < 0) {
b3b6e05e 642 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
643 << " processing entry failed: entry=" << entry
644 << " r=" << r << " tid=" << tid << dendl;
645 return -r;
646 }
647
648 processed.push_back(std::move(entry));
649 }
650
651 // Postprocess
652 bool canceled = true;
653
654 for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
b3b6e05e 655 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
656 << " postprocessing: i=" << i << " tid=" << tid << dendl;
657
658 std::optional<int64_t> tail_part_num;
659 std::optional<int64_t> head_part_num;
660 std::optional<int64_t> max_part_num;
661
662 std::unique_lock l(m);
663 auto objv = info.version;
664 if (new_tail > tail_part_num) tail_part_num = new_tail;
665 if (new_head > info.head_part_num) head_part_num = new_head;
666 if (new_max > info.max_push_part_num) max_part_num = new_max;
667 l.unlock();
668
669 if (processed.empty() &&
670 !tail_part_num &&
671 !max_part_num) {
b3b6e05e 672 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
673 << " nothing to update any more: i=" << i << " tid="
674 << tid << dendl;
675 canceled = false;
676 break;
677 }
678 auto u = fifo::update().tail_part_num(tail_part_num)
679 .head_part_num(head_part_num).max_push_part_num(max_part_num)
680 .journal_entries_rm(processed);
b3b6e05e 681 r = _update_meta(dpp, u, objv, &canceled, tid, y);
f67539c2 682 if (r < 0) {
b3b6e05e 683 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
684 << " _update_meta failed: update=" << u
685 << " r=" << r << " tid=" << tid << dendl;
686 break;
687 }
688
689 if (canceled) {
690 std::vector<fifo::journal_entry> new_processed;
691 std::unique_lock l(m);
b3b6e05e 692 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
693 << " update canceled, retrying: i=" << i << " tid="
694 << tid << dendl;
695 for (auto& e : processed) {
39ae355f
TL
696 if (info.journal.contains(e)) {
697 new_processed.push_back(e);
f67539c2 698 }
f67539c2
TL
699 }
700 processed = std::move(new_processed);
701 }
702 }
703 if (r == 0 && canceled) {
b3b6e05e 704 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
705 << " canceled too many times, giving up: tid=" << tid << dendl;
706 r = -ECANCELED;
707 }
708 if (r < 0) {
b3b6e05e 709 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
710 << " failed, r=: " << r << " tid=" << tid << dendl;
711 }
712 return r;
713}
714
39ae355f
TL
715int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp,
716 std::int64_t new_part_num, bool is_head,
717 std::uint64_t tid, optional_yield y)
f67539c2 718{
b3b6e05e 719 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
720 << " entering: tid=" << tid << dendl;
721 std::unique_lock l(m);
39ae355f
TL
722 std::vector<fifo::journal_entry> jentries{{ fifo::journal_entry::Op::create, new_part_num }};
723 if (info.journal.contains({fifo::journal_entry::Op::create, new_part_num}) &&
724 (!is_head || info.journal.contains({fifo::journal_entry::Op::set_head, new_part_num}))) {
f67539c2 725 l.unlock();
b3b6e05e 726 ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
727 << " new part journaled, but not processed: tid="
728 << tid << dendl;
b3b6e05e 729 auto r = process_journal(dpp, tid, y);
f67539c2 730 if (r < 0) {
b3b6e05e 731 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
732 << " process_journal failed: r=" << r << " tid=" << tid << dendl;
733 }
734 return r;
735 }
f67539c2
TL
736 auto version = info.version;
737
738 if (is_head) {
b3b6e05e 739 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 740 << " needs new head: tid=" << tid << dendl;
39ae355f 741 jentries.push_back({ fifo::journal_entry::Op::set_head, new_part_num });
f67539c2
TL
742 }
743 l.unlock();
744
745 int r = 0;
746 bool canceled = true;
747 for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
748 canceled = false;
b3b6e05e 749 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
750 << " updating metadata: i=" << i << " tid=" << tid << dendl;
751 auto u = fifo::update{}.journal_entries_add(jentries);
b3b6e05e 752 r = _update_meta(dpp, u, version, &canceled, tid, y);
f67539c2
TL
753 if (r >= 0 && canceled) {
754 std::unique_lock l(m);
39ae355f
TL
755 version = info.version;
756 auto found = (info.journal.contains({fifo::journal_entry::Op::create, new_part_num}) ||
757 info.journal.contains({fifo::journal_entry::Op::set_head, new_part_num}));
758 if ((info.max_push_part_num >= new_part_num &&
759 info.head_part_num >= new_part_num)) {
b3b6e05e 760 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
761 << " raced, but journaled and processed: i=" << i
762 << " tid=" << tid << dendl;
763 return 0;
764 }
765 if (found) {
b3b6e05e 766 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
767 << " raced, journaled but not processed: i=" << i
768 << " tid=" << tid << dendl;
769 canceled = false;
770 }
771 l.unlock();
772 }
773 if (r < 0) {
b3b6e05e 774 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
775 << " _update_meta failed: update=" << u << " r=" << r
776 << " tid=" << tid << dendl;
777 return r;
778 }
779 }
780 if (canceled) {
b3b6e05e 781 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
782 << " canceled too many times, giving up: tid=" << tid << dendl;
783 return -ECANCELED;
784 }
b3b6e05e 785 r = process_journal(dpp, tid, y);
f67539c2 786 if (r < 0) {
b3b6e05e 787 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
788 << " process_journal failed: r=" << r << " tid=" << tid << dendl;
789 }
790 return r;
791}
792
39ae355f
TL
793int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp,
794 std::int64_t new_head_part_num,
795 std::uint64_t tid, optional_yield y)
f67539c2 796{
b3b6e05e 797 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
39ae355f 798 << " entering: tid=" << tid << dendl;
f67539c2 799 std::unique_lock l(m);
f67539c2
TL
800 auto max_push_part_num = info.max_push_part_num;
801 auto version = info.version;
802 l.unlock();
803
804 int r = 0;
39ae355f 805 if (max_push_part_num < new_head_part_num) {
b3b6e05e 806 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 807 << " need new part: tid=" << tid << dendl;
39ae355f 808 r = _prepare_new_part(dpp, new_head_part_num, true, tid, y);
f67539c2 809 if (r < 0) {
b3b6e05e 810 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
811 << " _prepare_new_part failed: r=" << r
812 << " tid=" << tid << dendl;
813 return r;
814 }
815 std::unique_lock l(m);
39ae355f 816 if (info.max_push_part_num < new_head_part_num) {
b3b6e05e 817 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
818 << " inconsistency, push part less than head part: "
819 << " tid=" << tid << dendl;
820 return -EIO;
821 }
822 l.unlock();
823 return 0;
824 }
825
39ae355f
TL
826 fifo::journal_entry jentry;
827 jentry.op = fifo::journal_entry::Op::set_head;
828 jentry.part_num = new_head_part_num;
829
830 r = 0;
f67539c2
TL
831 bool canceled = true;
832 for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
39ae355f 833 canceled = false;
b3b6e05e 834 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
39ae355f
TL
835 << " updating metadata: i=" << i << " tid=" << tid << dendl;
836 auto u = fifo::update{}.journal_entries_add({{ jentry }});
b3b6e05e 837 r = _update_meta(dpp, u, version, &canceled, tid, y);
39ae355f
TL
838 if (r >= 0 && canceled) {
839 std::unique_lock l(m);
840 auto found = (info.journal.contains({fifo::journal_entry::Op::create, new_head_part_num}) ||
841 info.journal.contains({fifo::journal_entry::Op::set_head, new_head_part_num}));
842 version = info.version;
843 if ((info.head_part_num >= new_head_part_num)) {
844 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
845 << " raced, but journaled and processed: i=" << i
846 << " tid=" << tid << dendl;
847 return 0;
848 }
849 if (found) {
850 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
851 << " raced, journaled but not processed: i=" << i
852 << " tid=" << tid << dendl;
853 canceled = false;
854 }
855 l.unlock();
856 }
f67539c2 857 if (r < 0) {
b3b6e05e 858 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
859 << " _update_meta failed: update=" << u << " r=" << r
860 << " tid=" << tid << dendl;
861 return r;
862 }
f67539c2
TL
863 }
864 if (canceled) {
b3b6e05e 865 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
866 << " canceled too many times, giving up: tid=" << tid << dendl;
867 return -ECANCELED;
868 }
39ae355f
TL
869 r = process_journal(dpp, tid, y);
870 if (r < 0) {
871 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
872 << " process_journal failed: r=" << r << " tid=" << tid << dendl;
873 }
874 return r;
f67539c2
TL
875}
876
877struct NewPartPreparer : public Completion<NewPartPreparer> {
878 FIFO* f;
879 std::vector<fifo::journal_entry> jentries;
880 int i = 0;
39ae355f 881 std::int64_t new_part_num;
f67539c2
TL
882 bool canceled = false;
883 uint64_t tid;
884
b3b6e05e 885 NewPartPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
f67539c2 886 std::vector<fifo::journal_entry> jentries,
39ae355f 887 std::int64_t new_part_num,
f67539c2 888 std::uint64_t tid)
b3b6e05e 889 : Completion(dpp, super), f(f), jentries(std::move(jentries)),
39ae355f 890 new_part_num(new_part_num), tid(tid) {}
f67539c2 891
b3b6e05e
TL
892 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
893 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
894 << " entering: tid=" << tid << dendl;
895 if (r < 0) {
b3b6e05e 896 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
897 << " _update_meta failed: r=" << r
898 << " tid=" << tid << dendl;
899 complete(std::move(p), r);
900 return;
901 }
902
903 if (canceled) {
904 std::unique_lock l(f->m);
39ae355f
TL
905 auto found = (f->info.journal.contains({fifo::journal_entry::Op::create, new_part_num}) ||
906 f->info.journal.contains({fifo::journal_entry::Op::set_head, new_part_num}));
f67539c2
TL
907 auto max_push_part_num = f->info.max_push_part_num;
908 auto head_part_num = f->info.head_part_num;
909 auto version = f->info.version;
f67539c2 910 l.unlock();
39ae355f
TL
911 if ((max_push_part_num >= new_part_num &&
912 head_part_num >= new_part_num)) {
b3b6e05e 913 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
914 << " raced, but journaled and processed: i=" << i
915 << " tid=" << tid << dendl;
916 complete(std::move(p), 0);
917 return;
918 }
919 if (i >= MAX_RACE_RETRIES) {
920 complete(std::move(p), -ECANCELED);
921 return;
922 }
923 if (!found) {
924 ++i;
b3b6e05e 925 f->_update_meta(dpp, fifo::update{}
f67539c2
TL
926 .journal_entries_add(jentries),
927 version, &canceled, tid, call(std::move(p)));
928 return;
929 } else {
b3b6e05e 930 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
931 << " raced, journaled but not processed: i=" << i
932 << " tid=" << tid << dendl;
933 canceled = false;
934 }
935 // Fall through. We still need to process the journal.
936 }
b3b6e05e 937 f->process_journal(dpp, tid, super());
f67539c2
TL
938 return;
939 }
940};
941
39ae355f
TL
942void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, std::int64_t new_part_num,
943 bool is_head, std::uint64_t tid, lr::AioCompletion* c)
f67539c2
TL
944{
945 std::unique_lock l(m);
39ae355f
TL
946 std::vector<fifo::journal_entry> jentries{{fifo::journal_entry::Op::create, new_part_num}};
947 if (info.journal.contains({fifo::journal_entry::Op::create, new_part_num}) &&
948 (!is_head || info.journal.contains({fifo::journal_entry::Op::set_head, new_part_num}))) {
f67539c2 949 l.unlock();
b3b6e05e 950 ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
951 << " new part journaled, but not processed: tid="
952 << tid << dendl;
b3b6e05e 953 process_journal(dpp, tid, c);
f67539c2
TL
954 return;
955 }
f67539c2
TL
956 auto version = info.version;
957
958 if (is_head) {
39ae355f 959 jentries.push_back({ fifo::journal_entry::Op::set_head, new_part_num });
f67539c2
TL
960 }
961 l.unlock();
962
b3b6e05e 963 auto n = std::make_unique<NewPartPreparer>(dpp, this, c, jentries,
39ae355f 964 new_part_num, tid);
f67539c2 965 auto np = n.get();
b3b6e05e 966 _update_meta(dpp, fifo::update{}.journal_entries_add(jentries), version,
f67539c2
TL
967 &np->canceled, tid, NewPartPreparer::call(std::move(n)));
968}
969
970struct NewHeadPreparer : public Completion<NewHeadPreparer> {
971 FIFO* f;
972 int i = 0;
973 bool newpart;
39ae355f 974 std::int64_t new_head_part_num;
f67539c2
TL
975 bool canceled = false;
976 std::uint64_t tid;
977
b3b6e05e 978 NewHeadPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
39ae355f
TL
979 bool newpart, std::int64_t new_head_part_num,
980 std::uint64_t tid)
981 : Completion(dpp, super), f(f), newpart(newpart),
982 new_head_part_num(new_head_part_num), tid(tid) {}
f67539c2 983
b3b6e05e 984 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
985 if (newpart)
986 handle_newpart(std::move(p), r);
987 else
b3b6e05e 988 handle_update(dpp, std::move(p), r);
f67539c2
TL
989 }
990
991 void handle_newpart(Ptr&& p, int r) {
992 if (r < 0) {
993 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
994 << " _prepare_new_part failed: r=" << r
995 << " tid=" << tid << dendl;
996 complete(std::move(p), r);
997 return;
998 }
999 std::unique_lock l(f->m);
39ae355f 1000 if (f->info.max_push_part_num < new_head_part_num) {
f67539c2
TL
1001 l.unlock();
1002 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1003 << " _prepare_new_part failed: r=" << r
1004 << " tid=" << tid << dendl;
1005 complete(std::move(p), -EIO);
1006 } else {
1007 l.unlock();
1008 complete(std::move(p), 0);
1009 }
1010 }
1011
b3b6e05e 1012 void handle_update(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
39ae355f
TL
1013 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1014 << " entering: tid=" << tid << dendl;
f67539c2 1015 if (r < 0) {
b3b6e05e 1016 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
39ae355f 1017 << " _update_meta failed: r=" << r
f67539c2
TL
1018 << " tid=" << tid << dendl;
1019 complete(std::move(p), r);
1020 return;
1021 }
39ae355f 1022
f67539c2 1023 if (canceled) {
39ae355f
TL
1024 std::unique_lock l(f->m);
1025 auto found = (f->info.journal.contains({fifo::journal_entry::Op::create, new_head_part_num }) ||
1026 f->info.journal.contains({fifo::journal_entry::Op::set_head, new_head_part_num }));
1027 auto head_part_num = f->info.head_part_num;
1028 auto version = f->info.version;
1029
1030 l.unlock();
1031 if ((head_part_num >= new_head_part_num)) {
1032 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1033 << " raced, but journaled and processed: i=" << i
1034 << " tid=" << tid << dendl;
1035 complete(std::move(p), 0);
1036 return;
1037 }
f67539c2 1038 if (i >= MAX_RACE_RETRIES) {
f67539c2
TL
1039 complete(std::move(p), -ECANCELED);
1040 return;
1041 }
39ae355f 1042 if (!found) {
f67539c2 1043 ++i;
39ae355f
TL
1044 fifo::journal_entry jentry;
1045 jentry.op = fifo::journal_entry::Op::set_head;
1046 jentry.part_num = new_head_part_num;
1047 f->_update_meta(dpp, fifo::update{}
1048 .journal_entries_add({{jentry}}),
1049 version, &canceled, tid, call(std::move(p)));
f67539c2 1050 return;
39ae355f
TL
1051 } else {
1052 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1053 << " raced, journaled but not processed: i=" << i
1054 << " tid=" << tid << dendl;
1055 canceled = false;
f67539c2 1056 }
39ae355f 1057 // Fall through. We still need to process the journal.
f67539c2 1058 }
39ae355f 1059 f->process_journal(dpp, tid, super());
f67539c2
TL
1060 return;
1061 }
1062};
1063
39ae355f
TL
1064void FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::int64_t new_head_part_num,
1065 std::uint64_t tid, lr::AioCompletion* c)
f67539c2 1066{
b3b6e05e 1067 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1068 << " entering: tid=" << tid << dendl;
1069 std::unique_lock l(m);
f67539c2
TL
1070 auto max_push_part_num = info.max_push_part_num;
1071 auto version = info.version;
1072 l.unlock();
1073
39ae355f 1074 if (max_push_part_num < new_head_part_num) {
b3b6e05e 1075 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1076 << " need new part: tid=" << tid << dendl;
39ae355f 1077 auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, true, new_head_part_num,
f67539c2 1078 tid);
39ae355f
TL
1079 _prepare_new_part(dpp, new_head_part_num, true, tid,
1080 NewHeadPreparer::call(std::move(n)));
f67539c2 1081 } else {
b3b6e05e 1082 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1083 << " updating head: tid=" << tid << dendl;
39ae355f 1084 auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, false, new_head_part_num,
f67539c2
TL
1085 tid);
1086 auto np = n.get();
39ae355f
TL
1087 fifo::journal_entry jentry;
1088 jentry.op = fifo::journal_entry::Op::set_head;
1089 jentry.part_num = new_head_part_num;
1090 _update_meta(dpp, fifo::update{}.journal_entries_add({{jentry}}), version,
f67539c2
TL
1091 &np->canceled, tid, NewHeadPreparer::call(std::move(n)));
1092 }
1093}
1094
b3b6e05e 1095int FIFO::push_entries(const DoutPrefixProvider *dpp, const std::deque<cb::list>& data_bufs,
f67539c2
TL
1096 std::uint64_t tid, optional_yield y)
1097{
b3b6e05e 1098 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1099 << " entering: tid=" << tid << dendl;
1100 std::unique_lock l(m);
1101 auto head_part_num = info.head_part_num;
f67539c2
TL
1102 const auto part_oid = info.part_oid(head_part_num);
1103 l.unlock();
1104
39ae355f 1105 auto r = push_part(dpp, ioctx, part_oid, data_bufs, tid, y);
f67539c2 1106 if (r < 0) {
b3b6e05e 1107 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1108 << " push_part failed: r=" << r << " tid=" << tid << dendl;
1109 }
1110 return r;
1111}
1112
1113void FIFO::push_entries(const std::deque<cb::list>& data_bufs,
1114 std::uint64_t tid, lr::AioCompletion* c)
1115{
1116 std::unique_lock l(m);
1117 auto head_part_num = info.head_part_num;
f67539c2
TL
1118 const auto part_oid = info.part_oid(head_part_num);
1119 l.unlock();
1120
39ae355f 1121 push_part(ioctx, part_oid, data_bufs, tid, c);
f67539c2
TL
1122}
1123
b3b6e05e 1124int FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
f67539c2
TL
1125 bool exclusive, std::uint64_t tid,
1126 optional_yield y)
1127{
b3b6e05e 1128 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1129 << " entering: tid=" << tid << dendl;
1130 lr::ObjectWriteOperation op;
1131 std::unique_lock l(m);
1132 const auto part_oid = info.part_oid(part_num);
1133 l.unlock();
39ae355f 1134 rgw::cls::fifo::trim_part(&op, ofs, exclusive);
b3b6e05e 1135 auto r = rgw_rados_operate(dpp, ioctx, part_oid, &op, y);
f67539c2 1136 if (r < 0) {
b3b6e05e 1137 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1138 << " trim_part failed: r=" << r << " tid=" << tid << dendl;
1139 }
1140 return 0;
1141}
1142
20effc67 1143void FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
f67539c2
TL
1144 bool exclusive, std::uint64_t tid,
1145 lr::AioCompletion* c)
1146{
20effc67 1147 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1148 << " entering: tid=" << tid << dendl;
1149 lr::ObjectWriteOperation op;
1150 std::unique_lock l(m);
1151 const auto part_oid = info.part_oid(part_num);
1152 l.unlock();
39ae355f 1153 rgw::cls::fifo::trim_part(&op, ofs, exclusive);
f67539c2
TL
1154 auto r = ioctx.aio_operate(part_oid, c, &op);
1155 ceph_assert(r >= 0);
1156}
1157
b3b6e05e 1158int FIFO::open(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, std::string oid, std::unique_ptr<FIFO>* fifo,
f67539c2
TL
1159 optional_yield y, std::optional<fifo::objv> objv,
1160 bool probe)
1161{
b3b6e05e 1162 ldpp_dout(dpp, 20)
f67539c2
TL
1163 << __PRETTY_FUNCTION__ << ":" << __LINE__
1164 << " entering" << dendl;
1165 fifo::info info;
1166 std::uint32_t size;
1167 std::uint32_t over;
b3b6e05e 1168 int r = get_meta(dpp, ioctx, std::move(oid), objv, &info, &size, &over, 0, y,
f67539c2
TL
1169 probe);
1170 if (r < 0) {
1171 if (!(probe && (r == -ENOENT || r == -ENODATA))) {
b3b6e05e 1172 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1173 << " get_meta failed: r=" << r << dendl;
1174 }
1175 return r;
1176 }
1177 std::unique_ptr<FIFO> f(new FIFO(std::move(ioctx), oid));
1178 f->info = info;
1179 f->part_header_size = size;
1180 f->part_entry_overhead = over;
1181 // If there are journal entries, process them, in case
1182 // someone crashed mid-transaction.
1183 if (!info.journal.empty()) {
b3b6e05e 1184 ldpp_dout(dpp, 20)
f67539c2
TL
1185 << __PRETTY_FUNCTION__ << ":" << __LINE__
1186 << " processing leftover journal" << dendl;
b3b6e05e 1187 r = f->process_journal(dpp, 0, y);
f67539c2 1188 if (r < 0) {
b3b6e05e 1189 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1190 << " process_journal failed: r=" << r << dendl;
1191 return r;
1192 }
1193 }
1194 *fifo = std::move(f);
1195 return 0;
1196}
1197
b3b6e05e 1198int FIFO::create(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, std::string oid, std::unique_ptr<FIFO>* fifo,
f67539c2
TL
1199 optional_yield y, std::optional<fifo::objv> objv,
1200 std::optional<std::string_view> oid_prefix,
1201 bool exclusive, std::uint64_t max_part_size,
1202 std::uint64_t max_entry_size)
1203{
b3b6e05e 1204 ldpp_dout(dpp, 20)
f67539c2
TL
1205 << __PRETTY_FUNCTION__ << ":" << __LINE__
1206 << " entering" << dendl;
1207 lr::ObjectWriteOperation op;
1208 create_meta(&op, oid, objv, oid_prefix, exclusive, max_part_size,
1209 max_entry_size);
b3b6e05e 1210 auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
f67539c2 1211 if (r < 0) {
b3b6e05e 1212 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1213 << " create_meta failed: r=" << r << dendl;
1214 return r;
1215 }
b3b6e05e 1216 r = open(dpp, std::move(ioctx), std::move(oid), fifo, y, objv);
f67539c2
TL
1217 return r;
1218}
1219
b3b6e05e
TL
1220int FIFO::read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y) {
1221 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1222 << " entering: tid=" << tid << dendl;
1223 fifo::info _info;
1224 std::uint32_t _phs;
1225 std::uint32_t _peo;
1226
20effc67 1227 auto r = get_meta(dpp, ioctx, oid, std::nullopt, &_info, &_phs, &_peo, tid, y);
f67539c2 1228 if (r < 0) {
b3b6e05e 1229 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1230 << " get_meta failed: r=" << r << " tid=" << tid << dendl;
1231 return r;
1232 }
1233 std::unique_lock l(m);
1234 // We have a newer version already!
1235 if (_info.version.same_or_later(this->info.version)) {
1236 info = std::move(_info);
1237 part_header_size = _phs;
1238 part_entry_overhead = _peo;
1239 }
1240 return 0;
1241}
1242
b3b6e05e 1243int FIFO::read_meta(const DoutPrefixProvider *dpp, optional_yield y) {
f67539c2
TL
1244 std::unique_lock l(m);
1245 auto tid = ++next_tid;
1246 l.unlock();
b3b6e05e 1247 return read_meta(dpp, tid, y);
f67539c2
TL
1248}
1249
1250struct Reader : public Completion<Reader> {
1251 FIFO* fifo;
1252 cb::list bl;
1253 std::uint64_t tid;
b3b6e05e
TL
1254 Reader(const DoutPrefixProvider *dpp, FIFO* fifo, lr::AioCompletion* super, std::uint64_t tid)
1255 : Completion(dpp, super), fifo(fifo), tid(tid) {}
f67539c2 1256
b3b6e05e
TL
1257 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
1258 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1259 << " entering: tid=" << tid << dendl;
1260 if (r >= 0) try {
1261 fifo::op::get_meta_reply reply;
1262 auto iter = bl.cbegin();
1263 decode(reply, iter);
1264 std::unique_lock l(fifo->m);
1265 if (reply.info.version.same_or_later(fifo->info.version)) {
1266 fifo->info = std::move(reply.info);
1267 fifo->part_header_size = reply.part_header_size;
1268 fifo->part_entry_overhead = reply.part_entry_overhead;
1269 }
1270 } catch (const cb::error& err) {
b3b6e05e 1271 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1272 << " failed to decode response err=" << err.what()
1273 << " tid=" << tid << dendl;
1274 r = from_error_code(err.code());
1275 } else {
b3b6e05e 1276 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1277 << " read_meta failed r=" << r
1278 << " tid=" << tid << dendl;
1279 }
1280 complete(std::move(p), r);
1281 }
1282};
1283
b3b6e05e 1284void FIFO::read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c)
f67539c2 1285{
b3b6e05e 1286 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1287 << " entering: tid=" << tid << dendl;
1288 lr::ObjectReadOperation op;
1289 fifo::op::get_meta gm;
1290 cb::list in;
1291 encode(gm, in);
b3b6e05e 1292 auto reader = std::make_unique<Reader>(dpp, this, c, tid);
f67539c2
TL
1293 auto rp = reader.get();
1294 auto r = ioctx.aio_exec(oid, Reader::call(std::move(reader)), fifo::op::CLASS,
1295 fifo::op::GET_META, in, &rp->bl);
1296 assert(r >= 0);
1297}
1298
1299const fifo::info& FIFO::meta() const {
1300 return info;
1301}
1302
1303std::pair<std::uint32_t, std::uint32_t> FIFO::get_part_layout_info() const {
1304 return {part_header_size, part_entry_overhead};
1305}
1306
b3b6e05e
TL
1307int FIFO::push(const DoutPrefixProvider *dpp, const cb::list& bl, optional_yield y) {
1308 return push(dpp, std::vector{ bl }, y);
f67539c2
TL
1309}
1310
b3b6e05e
TL
1311void FIFO::push(const DoutPrefixProvider *dpp, const cb::list& bl, lr::AioCompletion* c) {
1312 push(dpp, std::vector{ bl }, c);
f67539c2
TL
1313}
1314
b3b6e05e 1315int FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_bufs, optional_yield y)
f67539c2
TL
1316{
1317 std::unique_lock l(m);
1318 auto tid = ++next_tid;
1319 auto max_entry_size = info.params.max_entry_size;
1320 auto need_new_head = info.need_new_head();
39ae355f 1321 auto head_part_num = info.head_part_num;
f67539c2 1322 l.unlock();
b3b6e05e 1323 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1324 << " entering: tid=" << tid << dendl;
1325 if (data_bufs.empty()) {
b3b6e05e 1326 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1327 << " empty push, returning success tid=" << tid << dendl;
1328 return 0;
1329 }
1330
1331 // Validate sizes
1332 for (const auto& bl : data_bufs) {
1333 if (bl.length() > max_entry_size) {
b3b6e05e 1334 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1335 << " entry bigger than max_entry_size tid=" << tid << dendl;
1336 return -E2BIG;
1337 }
1338 }
1339
1340 int r = 0;
1341 if (need_new_head) {
b3b6e05e 1342 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1343 << " need new head tid=" << tid << dendl;
39ae355f 1344 r = _prepare_new_head(dpp, head_part_num + 1, tid, y);
f67539c2 1345 if (r < 0) {
b3b6e05e 1346 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1347 << " _prepare_new_head failed: r=" << r
1348 << " tid=" << tid << dendl;
1349 return r;
1350 }
1351 }
1352
1353 std::deque<cb::list> remaining(data_bufs.begin(), data_bufs.end());
1354 std::deque<cb::list> batch;
1355
1356 uint64_t batch_len = 0;
1357 auto retries = 0;
1358 bool canceled = true;
1359 while ((!remaining.empty() || !batch.empty()) &&
1360 (retries <= MAX_RACE_RETRIES)) {
b3b6e05e 1361 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1362 << " preparing push: remaining=" << remaining.size()
1363 << " batch=" << batch.size() << " retries=" << retries
1364 << " tid=" << tid << dendl;
1365 std::unique_lock l(m);
39ae355f 1366 head_part_num = info.head_part_num;
f67539c2
TL
1367 auto max_part_size = info.params.max_part_size;
1368 auto overhead = part_entry_overhead;
1369 l.unlock();
1370
1371 while (!remaining.empty() &&
1372 (remaining.front().length() + batch_len <= max_part_size)) {
1373 /* We can send entries with data_len up to max_entry_size,
1374 however, we want to also account the overhead when
1375 dealing with multiple entries. Previous check doesn't
1376 account for overhead on purpose. */
1377 batch_len += remaining.front().length() + overhead;
1378 batch.push_back(std::move(remaining.front()));
1379 remaining.pop_front();
1380 }
b3b6e05e 1381 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1382 << " prepared push: remaining=" << remaining.size()
1383 << " batch=" << batch.size() << " retries=" << retries
1384 << " batch_len=" << batch_len
1385 << " tid=" << tid << dendl;
1386
b3b6e05e 1387 auto r = push_entries(dpp, batch, tid, y);
f67539c2
TL
1388 if (r == -ERANGE) {
1389 canceled = true;
1390 ++retries;
b3b6e05e 1391 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
39ae355f
TL
1392 << " need new head tid=" << tid << dendl;
1393 r = _prepare_new_head(dpp, head_part_num + 1, tid, y);
f67539c2 1394 if (r < 0) {
b3b6e05e 1395 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1396 << " prepare_new_head failed: r=" << r
1397 << " tid=" << tid << dendl;
1398 return r;
1399 }
1400 r = 0;
1401 continue;
1402 }
39ae355f
TL
1403 if (r == -ENOENT) {
1404 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1405 << " racing client trimmed part, rereading metadata "
1406 << "tid=" << tid << dendl;
1407 canceled = true;
1408 ++retries;
1409 r = read_meta(dpp, y);
1410 if (r < 0) {
1411 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1412 << " read_meta failed: r=" << r
1413 << " tid=" << tid << dendl;
1414 return r;
1415 }
1416 r = 0;
1417 continue;
1418 }
f67539c2 1419 if (r < 0) {
b3b6e05e 1420 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1421 << " push_entries failed: r=" << r
1422 << " tid=" << tid << dendl;
1423 return r;
1424 }
1425 // Made forward progress!
1426 canceled = false;
1427 retries = 0;
1428 batch_len = 0;
1429 if (static_cast<unsigned>(r) == batch.size()) {
1430 batch.clear();
1431 } else {
1432 batch.erase(batch.begin(), batch.begin() + r);
1433 for (const auto& b : batch) {
1434 batch_len += b.length() + part_entry_overhead;
1435 }
1436 }
1437 }
1438 if (canceled) {
b3b6e05e 1439 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1440 << " canceled too many times, giving up: tid=" << tid << dendl;
1441 return -ECANCELED;
1442 }
1443 return 0;
1444}
1445
1446struct Pusher : public Completion<Pusher> {
1447 FIFO* f;
1448 std::deque<cb::list> remaining;
1449 std::deque<cb::list> batch;
1450 int i = 0;
39ae355f 1451 std::int64_t head_part_num;
f67539c2 1452 std::uint64_t tid;
39ae355f 1453 enum { pushing, new_heading, meta_reading } state = pushing;
f67539c2 1454
20effc67 1455 void prep_then_push(const DoutPrefixProvider *dpp, Ptr&& p, const unsigned successes) {
f67539c2
TL
1456 std::unique_lock l(f->m);
1457 auto max_part_size = f->info.params.max_part_size;
1458 auto part_entry_overhead = f->part_entry_overhead;
39ae355f 1459 head_part_num = f->info.head_part_num;
f67539c2
TL
1460 l.unlock();
1461
20effc67 1462 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1463 << " preparing push: remaining=" << remaining.size()
1464 << " batch=" << batch.size() << " i=" << i
1465 << " tid=" << tid << dendl;
1466
1467 uint64_t batch_len = 0;
1468 if (successes > 0) {
1469 if (successes == batch.size()) {
1470 batch.clear();
1471 } else {
1472 batch.erase(batch.begin(), batch.begin() + successes);
1473 for (const auto& b : batch) {
1474 batch_len += b.length() + part_entry_overhead;
1475 }
1476 }
1477 }
1478
1479 if (batch.empty() && remaining.empty()) {
1480 complete(std::move(p), 0);
1481 return;
1482 }
1483
1484 while (!remaining.empty() &&
1485 (remaining.front().length() + batch_len <= max_part_size)) {
1486
1487 /* We can send entries with data_len up to max_entry_size,
1488 however, we want to also account the overhead when
1489 dealing with multiple entries. Previous check doesn't
1490 account for overhead on purpose. */
1491 batch_len += remaining.front().length() + part_entry_overhead;
1492 batch.push_back(std::move(remaining.front()));
1493 remaining.pop_front();
1494 }
20effc67 1495 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1496 << " prepared push: remaining=" << remaining.size()
1497 << " batch=" << batch.size() << " i=" << i
1498 << " batch_len=" << batch_len
1499 << " tid=" << tid << dendl;
1500 push(std::move(p));
1501 }
1502
1503 void push(Ptr&& p) {
1504 f->push_entries(batch, tid, call(std::move(p)));
1505 }
1506
b3b6e05e 1507 void new_head(const DoutPrefixProvider *dpp, Ptr&& p) {
39ae355f
TL
1508 state = new_heading;
1509 f->_prepare_new_head(dpp, head_part_num + 1, tid, call(std::move(p)));
1510 }
1511
1512 void read_meta(const DoutPrefixProvider *dpp, Ptr&& p) {
1513 ++i;
1514 state = meta_reading;
1515 f->read_meta(dpp, tid, call(std::move(p)));
f67539c2
TL
1516 }
1517
b3b6e05e 1518 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
39ae355f
TL
1519 switch (state) {
1520 case pushing:
f67539c2 1521 if (r == -ERANGE) {
b3b6e05e 1522 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1523 << " need new head tid=" << tid << dendl;
b3b6e05e 1524 new_head(dpp, std::move(p));
f67539c2
TL
1525 return;
1526 }
39ae355f
TL
1527 if (r == -ENOENT) {
1528 if (i > MAX_RACE_RETRIES) {
1529 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1530 << " racing client deleted part, but we're out"
1531 << " of retries: tid=" << tid << dendl;
1532 complete(std::move(p), r);
1533 }
1534 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1535 << " racing client deleted part: tid=" << tid << dendl;
1536 read_meta(dpp, std::move(p));
1537 return;
1538 }
f67539c2 1539 if (r < 0) {
b3b6e05e 1540 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1541 << " push_entries failed: r=" << r
1542 << " tid=" << tid << dendl;
1543 complete(std::move(p), r);
1544 return;
1545 }
1546 i = 0; // We've made forward progress, so reset the race counter!
20effc67 1547 prep_then_push(dpp, std::move(p), r);
39ae355f
TL
1548 break;
1549
1550 case new_heading:
f67539c2 1551 if (r < 0) {
b3b6e05e 1552 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1553 << " prepare_new_head failed: r=" << r
1554 << " tid=" << tid << dendl;
1555 complete(std::move(p), r);
1556 return;
1557 }
39ae355f 1558 state = pushing;
20effc67 1559 handle_new_head(dpp, std::move(p), r);
39ae355f
TL
1560 break;
1561
1562 case meta_reading:
1563 if (r < 0) {
1564 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
1565 << " read_meta failed: r=" << r
1566 << " tid=" << tid << dendl;
1567 complete(std::move(p), r);
1568 return;
1569 }
1570 state = pushing;
1571 prep_then_push(dpp, std::move(p), r);
1572 break;
f67539c2
TL
1573 }
1574 }
1575
20effc67 1576 void handle_new_head(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
1577 if (r == -ECANCELED) {
1578 if (p->i == MAX_RACE_RETRIES) {
20effc67 1579 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1580 << " canceled too many times, giving up: tid=" << tid << dendl;
1581 complete(std::move(p), -ECANCELED);
1582 return;
1583 }
1584 ++p->i;
1585 } else if (r) {
1586 complete(std::move(p), r);
1587 return;
1588 }
1589
1590 if (p->batch.empty()) {
20effc67 1591 prep_then_push(dpp, std::move(p), 0);
f67539c2
TL
1592 return;
1593 } else {
1594 push(std::move(p));
1595 return;
1596 }
1597 }
1598
b3b6e05e 1599 Pusher(const DoutPrefixProvider *dpp, FIFO* f, std::deque<cb::list>&& remaining,
39ae355f
TL
1600 std::int64_t head_part_num, std::uint64_t tid,
1601 lr::AioCompletion* super)
b3b6e05e 1602 : Completion(dpp, super), f(f), remaining(std::move(remaining)),
39ae355f 1603 head_part_num(head_part_num), tid(tid) {}
f67539c2
TL
1604};
1605
b3b6e05e 1606void FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_bufs,
f67539c2
TL
1607 lr::AioCompletion* c)
1608{
1609 std::unique_lock l(m);
1610 auto tid = ++next_tid;
1611 auto max_entry_size = info.params.max_entry_size;
1612 auto need_new_head = info.need_new_head();
39ae355f 1613 auto head_part_num = info.head_part_num;
f67539c2 1614 l.unlock();
b3b6e05e 1615 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1616 << " entering: tid=" << tid << dendl;
b3b6e05e 1617 auto p = std::make_unique<Pusher>(dpp, this, std::deque<cb::list>(data_bufs.begin(), data_bufs.end()),
39ae355f 1618 head_part_num, tid, c);
f67539c2
TL
1619 // Validate sizes
1620 for (const auto& bl : data_bufs) {
1621 if (bl.length() > max_entry_size) {
b3b6e05e 1622 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1623 << " entry bigger than max_entry_size tid=" << tid << dendl;
1624 Pusher::complete(std::move(p), -E2BIG);
1625 return;
1626 }
1627 }
1628
1629 if (data_bufs.empty() ) {
b3b6e05e 1630 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1631 << " empty push, returning success tid=" << tid << dendl;
1632 Pusher::complete(std::move(p), 0);
1633 return;
1634 }
1635
1636 if (need_new_head) {
b3b6e05e 1637 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1638 << " need new head tid=" << tid << dendl;
b3b6e05e 1639 p->new_head(dpp, std::move(p));
f67539c2 1640 } else {
20effc67 1641 p->prep_then_push(dpp, std::move(p), 0);
f67539c2
TL
1642 }
1643}
1644
b3b6e05e 1645int FIFO::list(const DoutPrefixProvider *dpp, int max_entries,
f67539c2
TL
1646 std::optional<std::string_view> markstr,
1647 std::vector<list_entry>* presult, bool* pmore,
1648 optional_yield y)
1649{
1650 std::unique_lock l(m);
1651 auto tid = ++next_tid;
1652 std::int64_t part_num = info.tail_part_num;
1653 l.unlock();
b3b6e05e 1654 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1655 << " entering: tid=" << tid << dendl;
1656 std::uint64_t ofs = 0;
1657 if (markstr) {
1658 auto marker = to_marker(*markstr);
1659 if (!marker) {
b3b6e05e 1660 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1661 << " invalid marker string: " << markstr
1662 << " tid= "<< tid << dendl;
1663 return -EINVAL;
1664 }
1665 part_num = marker->num;
1666 ofs = marker->ofs;
1667 }
1668
1669 std::vector<list_entry> result;
1670 result.reserve(max_entries);
1671 bool more = false;
1672
1673 std::vector<fifo::part_list_entry> entries;
1674 int r = 0;
1675 while (max_entries > 0) {
b3b6e05e 1676 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1677 << " max_entries=" << max_entries << " tid=" << tid << dendl;
1678 bool part_more = false;
1679 bool part_full = false;
1680
1681 std::unique_lock l(m);
1682 auto part_oid = info.part_oid(part_num);
1683 l.unlock();
1684
39ae355f
TL
1685 r = list_part(dpp, ioctx, part_oid, ofs, max_entries, &entries,
1686 &part_more, &part_full, tid, y);
f67539c2 1687 if (r == -ENOENT) {
b3b6e05e 1688 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1689 << " missing part, rereading metadata"
1690 << " tid= "<< tid << dendl;
b3b6e05e 1691 r = read_meta(dpp, tid, y);
f67539c2 1692 if (r < 0) {
b3b6e05e 1693 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1694 << " read_meta failed: r=" << r
1695 << " tid= "<< tid << dendl;
1696 return r;
1697 }
1698 if (part_num < info.tail_part_num) {
1699 /* raced with trim? restart */
b3b6e05e 1700 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1701 << " raced with trim, restarting: tid=" << tid << dendl;
1702 max_entries += result.size();
1703 result.clear();
1704 std::unique_lock l(m);
1705 part_num = info.tail_part_num;
1706 l.unlock();
1707 ofs = 0;
1708 continue;
1709 }
b3b6e05e 1710 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1711 << " assuming part was not written yet, so end of data: "
1712 << "tid=" << tid << dendl;
1713 more = false;
1714 r = 0;
1715 break;
1716 }
1717 if (r < 0) {
b3b6e05e 1718 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1719 << " list_entries failed: r=" << r
1720 << " tid= "<< tid << dendl;
1721 return r;
1722 }
1723 more = part_full || part_more;
1724 for (auto& entry : entries) {
1725 list_entry e;
1726 e.data = std::move(entry.data);
1727 e.marker = marker{part_num, entry.ofs}.to_string();
1728 e.mtime = entry.mtime;
1729 result.push_back(std::move(e));
1730 --max_entries;
1731 if (max_entries == 0)
1732 break;
1733 }
1734 entries.clear();
1735 if (max_entries > 0 &&
1736 part_more) {
1737 }
1738
1739 if (!part_full) {
b3b6e05e 1740 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1741 << " head part is not full, so we can assume we're done: "
1742 << "tid=" << tid << dendl;
1743 break;
1744 }
1745 if (!part_more) {
1746 ++part_num;
1747 ofs = 0;
1748 }
1749 }
1750 if (presult)
1751 *presult = std::move(result);
1752 if (pmore)
1753 *pmore = more;
1754 return 0;
1755}
1756
b3b6e05e 1757int FIFO::trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool exclusive, optional_yield y)
f67539c2
TL
1758{
1759 bool overshoot = false;
1760 auto marker = to_marker(markstr);
1761 if (!marker) {
1762 return -EINVAL;
1763 }
1764 auto part_num = marker->num;
1765 auto ofs = marker->ofs;
1766 std::unique_lock l(m);
1767 auto tid = ++next_tid;
1768 auto hn = info.head_part_num;
1769 const auto max_part_size = info.params.max_part_size;
1770 if (part_num > hn) {
1771 l.unlock();
b3b6e05e 1772 auto r = read_meta(dpp, tid, y);
f67539c2
TL
1773 if (r < 0) {
1774 return r;
1775 }
1776 l.lock();
1777 auto hn = info.head_part_num;
1778 if (part_num > hn) {
1779 overshoot = true;
1780 part_num = hn;
1781 ofs = max_part_size;
1782 }
1783 }
1784 if (part_num < info.tail_part_num) {
1785 return -ENODATA;
1786 }
1787 auto pn = info.tail_part_num;
1788 l.unlock();
b3b6e05e 1789 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1790 << " entering: tid=" << tid << dendl;
1791
1792 int r = 0;
1793 while (pn < part_num) {
b3b6e05e 1794 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1795 << " pn=" << pn << " tid=" << tid << dendl;
1796 std::unique_lock l(m);
1797 l.unlock();
39ae355f 1798 r = trim_part(dpp, pn, max_part_size, false, tid, y);
f67539c2 1799 if (r < 0 && r == -ENOENT) {
b3b6e05e 1800 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1801 << " trim_part failed: r=" << r
1802 << " tid= "<< tid << dendl;
1803 return r;
1804 }
1805 ++pn;
1806 }
39ae355f 1807 r = trim_part(dpp, part_num, ofs, exclusive, tid, y);
f67539c2 1808 if (r < 0 && r != -ENOENT) {
b3b6e05e 1809 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1810 << " trim_part failed: r=" << r
1811 << " tid= "<< tid << dendl;
1812 return r;
1813 }
1814
1815 l.lock();
1816 auto tail_part_num = info.tail_part_num;
1817 auto objv = info.version;
1818 l.unlock();
1819 bool canceled = tail_part_num < part_num;
1820 int retries = 0;
1821 while ((tail_part_num < part_num) &&
1822 canceled &&
1823 (retries <= MAX_RACE_RETRIES)) {
b3b6e05e 1824 r = _update_meta(dpp, fifo::update{}.tail_part_num(part_num), objv, &canceled,
f67539c2
TL
1825 tid, y);
1826 if (r < 0) {
b3b6e05e 1827 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1828 << " _update_meta failed: r=" << r
1829 << " tid= "<< tid << dendl;
1830 return r;
1831 }
1832 if (canceled) {
b3b6e05e 1833 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1834 << " canceled: retries=" << retries
1835 << " tid=" << tid << dendl;
1836 l.lock();
1837 tail_part_num = info.tail_part_num;
1838 objv = info.version;
1839 l.unlock();
1840 ++retries;
1841 }
1842 }
1843 if (canceled) {
b3b6e05e 1844 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1845 << " canceled too many times, giving up: tid=" << tid << dendl;
1846 return -EIO;
1847 }
1848 return overshoot ? -ENODATA : 0;
1849}
1850
1851struct Trimmer : public Completion<Trimmer> {
1852 FIFO* fifo;
1853 std::int64_t part_num;
1854 std::uint64_t ofs;
1855 std::int64_t pn;
1856 bool exclusive;
1857 std::uint64_t tid;
1858 bool update = false;
1859 bool reread = false;
1860 bool canceled = false;
1861 bool overshoot = false;
1862 int retries = 0;
1863
b3b6e05e 1864 Trimmer(const DoutPrefixProvider *dpp, FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn,
f67539c2 1865 bool exclusive, lr::AioCompletion* super, std::uint64_t tid)
b3b6e05e 1866 : Completion(dpp, super), fifo(fifo), part_num(part_num), ofs(ofs), pn(pn),
f67539c2
TL
1867 exclusive(exclusive), tid(tid) {}
1868
b3b6e05e
TL
1869 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
1870 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1871 << " entering: tid=" << tid << dendl;
1872
1873 if (reread) {
1874 reread = false;
1875 if (r < 0) {
b3b6e05e 1876 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1877 << " read_meta failed: r="
1878 << r << " tid=" << tid << dendl;
1879 complete(std::move(p), r);
1880 return;
1881 }
1882 std::unique_lock l(fifo->m);
1883 auto hn = fifo->info.head_part_num;
1884 const auto max_part_size = fifo->info.params.max_part_size;
1885 const auto tail_part_num = fifo->info.tail_part_num;
1886 l.unlock();
1887 if (part_num > hn) {
1888 part_num = hn;
1889 ofs = max_part_size;
1890 overshoot = true;
1891 }
1892 if (part_num < tail_part_num) {
1893 complete(std::move(p), -ENODATA);
1894 return;
1895 }
1896 pn = tail_part_num;
1897 if (pn < part_num) {
b3b6e05e 1898 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1899 << " pn=" << pn << " tid=" << tid << dendl;
39ae355f
TL
1900 fifo->trim_part(dpp, pn++, max_part_size, false, tid,
1901 call(std::move(p)));
f67539c2
TL
1902 } else {
1903 update = true;
1904 canceled = tail_part_num < part_num;
39ae355f 1905 fifo->trim_part(dpp, part_num, ofs, exclusive, tid, call(std::move(p)));
f67539c2
TL
1906 }
1907 return;
1908 }
1909
1910 if (r == -ENOENT) {
1911 r = 0;
1912 }
1913
1914 if (r < 0) {
b3b6e05e 1915 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1916 << (update ? " update_meta " : " trim ") << "failed: r="
1917 << r << " tid=" << tid << dendl;
1918 complete(std::move(p), r);
1919 return;
1920 }
1921
1922 if (!update) {
b3b6e05e 1923 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1924 << " handling preceding trim callback: tid=" << tid << dendl;
1925 retries = 0;
1926 if (pn < part_num) {
b3b6e05e 1927 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1928 << " pn=" << pn << " tid=" << tid << dendl;
1929 std::unique_lock l(fifo->m);
1930 const auto max_part_size = fifo->info.params.max_part_size;
1931 l.unlock();
39ae355f
TL
1932 fifo->trim_part(dpp, pn++, max_part_size, false, tid,
1933 call(std::move(p)));
f67539c2
TL
1934 return;
1935 }
1936
1937 std::unique_lock l(fifo->m);
1938 const auto tail_part_num = fifo->info.tail_part_num;
1939 l.unlock();
1940 update = true;
1941 canceled = tail_part_num < part_num;
39ae355f 1942 fifo->trim_part(dpp, part_num, ofs, exclusive, tid, call(std::move(p)));
f67539c2
TL
1943 return;
1944 }
1945
b3b6e05e 1946 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1947 << " handling update-needed callback: tid=" << tid << dendl;
1948 std::unique_lock l(fifo->m);
1949 auto tail_part_num = fifo->info.tail_part_num;
1950 auto objv = fifo->info.version;
1951 l.unlock();
1952 if ((tail_part_num < part_num) &&
1953 canceled) {
1954 if (retries > MAX_RACE_RETRIES) {
b3b6e05e 1955 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1956 << " canceled too many times, giving up: tid=" << tid << dendl;
1957 complete(std::move(p), -EIO);
1958 return;
1959 }
1960 ++retries;
b3b6e05e 1961 fifo->_update_meta(dpp, fifo::update{}
f67539c2
TL
1962 .tail_part_num(part_num), objv, &canceled,
1963 tid, call(std::move(p)));
1964 } else {
1965 complete(std::move(p), overshoot ? -ENODATA : 0);
1966 }
1967 }
1968};
1969
b3b6e05e 1970void FIFO::trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool exclusive,
f67539c2
TL
1971 lr::AioCompletion* c) {
1972 auto marker = to_marker(markstr);
1973 auto realmark = marker.value_or(::rgw::cls::fifo::marker{});
1974 std::unique_lock l(m);
1975 const auto hn = info.head_part_num;
1976 const auto max_part_size = info.params.max_part_size;
1977 const auto pn = info.tail_part_num;
1978 const auto part_oid = info.part_oid(pn);
1979 auto tid = ++next_tid;
1980 l.unlock();
b3b6e05e 1981 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2 1982 << " entering: tid=" << tid << dendl;
b3b6e05e 1983 auto trimmer = std::make_unique<Trimmer>(dpp, this, realmark.num, realmark.ofs,
f67539c2
TL
1984 pn, exclusive, c, tid);
1985 if (!marker) {
1986 Trimmer::complete(std::move(trimmer), -EINVAL);
1987 return;
1988 }
1989 ++trimmer->pn;
1990 auto ofs = marker->ofs;
1991 if (marker->num > hn) {
1992 trimmer->reread = true;
b3b6e05e 1993 read_meta(dpp, tid, Trimmer::call(std::move(trimmer)));
f67539c2
TL
1994 return;
1995 }
1996 if (pn < marker->num) {
b3b6e05e 1997 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
1998 << " pn=" << pn << " tid=" << tid << dendl;
1999 ofs = max_part_size;
2000 } else {
2001 trimmer->update = true;
2002 }
39ae355f 2003 trim_part(dpp, pn, ofs, exclusive, tid, Trimmer::call(std::move(trimmer)));
f67539c2
TL
2004}
2005
b3b6e05e 2006int FIFO::get_part_info(const DoutPrefixProvider *dpp, int64_t part_num,
f67539c2
TL
2007 fifo::part_header* header,
2008 optional_yield y)
2009{
2010 std::unique_lock l(m);
2011 const auto part_oid = info.part_oid(part_num);
2012 auto tid = ++next_tid;
2013 l.unlock();
b3b6e05e 2014 auto r = rgw::cls::fifo::get_part_info(dpp, ioctx, part_oid, header, tid, y);
f67539c2 2015 if (r < 0) {
b3b6e05e 2016 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2017 << " get_part_info failed: r="
2018 << r << " tid=" << tid << dendl;
2019 }
2020 return r;
2021}
2022
2023void FIFO::get_part_info(int64_t part_num,
2024 fifo::part_header* header,
2025 lr::AioCompletion* c)
2026{
2027 std::unique_lock l(m);
2028 const auto part_oid = info.part_oid(part_num);
2029 auto tid = ++next_tid;
2030 l.unlock();
2031 auto op = rgw::cls::fifo::get_part_info(cct, header, tid);
2032 auto r = ioctx.aio_operate(part_oid, c, &op, nullptr);
2033 ceph_assert(r >= 0);
2034}
2035
2036struct InfoGetter : Completion<InfoGetter> {
2037 FIFO* fifo;
2038 fifo::part_header header;
2039 fu2::function<void(int r, fifo::part_header&&)> f;
2040 std::uint64_t tid;
2041 bool headerread = false;
2042
b3b6e05e 2043 InfoGetter(const DoutPrefixProvider *dpp, FIFO* fifo, fu2::function<void(int r, fifo::part_header&&)> f,
f67539c2 2044 std::uint64_t tid, lr::AioCompletion* super)
b3b6e05e
TL
2045 : Completion(dpp, super), fifo(fifo), f(std::move(f)), tid(tid) {}
2046 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
2047 if (!headerread) {
2048 if (r < 0) {
b3b6e05e 2049 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2050 << " read_meta failed: r="
2051 << r << " tid=" << tid << dendl;
2052 if (f)
2053 f(r, {});
2054 complete(std::move(p), r);
2055 return;
2056 }
2057
2058 auto info = fifo->meta();
2059 auto hpn = info.head_part_num;
2060 if (hpn < 0) {
b3b6e05e 2061 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2062 << " no head, returning empty partinfo r="
2063 << r << " tid=" << tid << dendl;
2064 if (f)
2065 f(0, {});
2066 complete(std::move(p), r);
2067 return;
2068 }
2069 headerread = true;
2070 auto op = rgw::cls::fifo::get_part_info(fifo->cct, &header, tid);
2071 std::unique_lock l(fifo->m);
2072 auto oid = fifo->info.part_oid(hpn);
2073 l.unlock();
2074 r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op,
2075 nullptr);
2076 ceph_assert(r >= 0);
2077 return;
2078 }
2079
2080 if (r < 0) {
b3b6e05e 2081 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2082 << " get_part_info failed: r="
2083 << r << " tid=" << tid << dendl;
2084 }
2085
2086 if (f)
2087 f(r, std::move(header));
2088 complete(std::move(p), r);
2089 return;
2090 }
2091};
2092
b3b6e05e 2093void FIFO::get_head_info(const DoutPrefixProvider *dpp, fu2::unique_function<void(int r,
f67539c2
TL
2094 fifo::part_header&&)> f,
2095 lr::AioCompletion* c)
2096{
2097 std::unique_lock l(m);
2098 auto tid = ++next_tid;
2099 l.unlock();
b3b6e05e
TL
2100 auto ig = std::make_unique<InfoGetter>(dpp, this, std::move(f), tid, c);
2101 read_meta(dpp, tid, InfoGetter::call(std::move(ig)));
f67539c2
TL
2102}
2103
2104struct JournalProcessor : public Completion<JournalProcessor> {
2105private:
2106 FIFO* const fifo;
2107
2108 std::vector<fifo::journal_entry> processed;
39ae355f
TL
2109 decltype(fifo->info.journal) journal;
2110 decltype(journal)::iterator iter;
f67539c2
TL
2111 std::int64_t new_tail;
2112 std::int64_t new_head;
2113 std::int64_t new_max;
2114 int race_retries = 0;
2115 bool first_pp = true;
2116 bool canceled = false;
2117 std::uint64_t tid;
2118
2119 enum {
2120 entry_callback,
2121 pp_callback,
2122 } state;
2123
39ae355f 2124 void create_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num) {
20effc67 2125 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2126 << " entering: tid=" << tid << dendl;
2127 state = entry_callback;
2128 lr::ObjectWriteOperation op;
2129 op.create(false); /* We don't need exclusivity, part_init ensures
2130 we're creating from the same journal entry. */
2131 std::unique_lock l(fifo->m);
39ae355f 2132 part_init(&op, fifo->info.params);
f67539c2
TL
2133 auto oid = fifo->info.part_oid(part_num);
2134 l.unlock();
2135 auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op);
2136 ceph_assert(r >= 0);
2137 return;
2138 }
2139
39ae355f 2140 void remove_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num) {
20effc67 2141 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2142 << " entering: tid=" << tid << dendl;
2143 state = entry_callback;
2144 lr::ObjectWriteOperation op;
2145 op.remove();
2146 std::unique_lock l(fifo->m);
2147 auto oid = fifo->info.part_oid(part_num);
2148 l.unlock();
2149 auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op);
2150 ceph_assert(r >= 0);
2151 return;
2152 }
2153
b3b6e05e 2154 void finish_je(const DoutPrefixProvider *dpp, Ptr&& p, int r,
f67539c2 2155 const fifo::journal_entry& entry) {
b3b6e05e 2156 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2157 << " entering: tid=" << tid << dendl;
2158
b3b6e05e 2159 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2160 << " finishing entry: entry=" << entry
2161 << " tid=" << tid << dendl;
2162
2163 if (entry.op == fifo::journal_entry::Op::remove && r == -ENOENT)
2164 r = 0;
2165
2166 if (r < 0) {
b3b6e05e 2167 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2168 << " processing entry failed: entry=" << entry
2169 << " r=" << r << " tid=" << tid << dendl;
2170 complete(std::move(p), r);
2171 return;
2172 } else {
2173 switch (entry.op) {
2174 case fifo::journal_entry::Op::unknown:
2175 case fifo::journal_entry::Op::set_head:
2176 // Can't happen. Filtered out in process.
2177 complete(std::move(p), -EIO);
2178 return;
2179
2180 case fifo::journal_entry::Op::create:
2181 if (entry.part_num > new_max) {
2182 new_max = entry.part_num;
2183 }
2184 break;
2185 case fifo::journal_entry::Op::remove:
2186 if (entry.part_num >= new_tail) {
2187 new_tail = entry.part_num + 1;
2188 }
2189 break;
2190 }
2191 processed.push_back(entry);
2192 }
2193 ++iter;
b3b6e05e 2194 process(dpp, std::move(p));
f67539c2
TL
2195 }
2196
b3b6e05e
TL
2197 void postprocess(const DoutPrefixProvider *dpp, Ptr&& p) {
2198 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2199 << " entering: tid=" << tid << dendl;
2200 if (processed.empty()) {
b3b6e05e 2201 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2202 << " nothing to update any more: race_retries="
2203 << race_retries << " tid=" << tid << dendl;
2204 complete(std::move(p), 0);
2205 return;
2206 }
b3b6e05e 2207 pp_run(dpp, std::move(p), 0, false);
f67539c2
TL
2208 }
2209
2210public:
2211
b3b6e05e
TL
2212 JournalProcessor(const DoutPrefixProvider *dpp, FIFO* fifo, std::uint64_t tid, lr::AioCompletion* super)
2213 : Completion(dpp, super), fifo(fifo), tid(tid) {
f67539c2
TL
2214 std::unique_lock l(fifo->m);
2215 journal = fifo->info.journal;
2216 iter = journal.begin();
2217 new_tail = fifo->info.tail_part_num;
2218 new_head = fifo->info.head_part_num;
2219 new_max = fifo->info.max_push_part_num;
2220 }
2221
b3b6e05e
TL
2222 void pp_run(const DoutPrefixProvider *dpp, Ptr&& p, int r, bool canceled) {
2223 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2224 << " entering: tid=" << tid << dendl;
2225 std::optional<int64_t> tail_part_num;
2226 std::optional<int64_t> head_part_num;
2227 std::optional<int64_t> max_part_num;
2228
2229 if (r < 0) {
b3b6e05e 2230 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2231 << " failed, r=: " << r << " tid=" << tid << dendl;
2232 complete(std::move(p), r);
2233 }
2234
2235
b3b6e05e 2236 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2237 << " postprocessing: race_retries="
2238 << race_retries << " tid=" << tid << dendl;
2239
2240 if (!first_pp && r == 0 && !canceled) {
b3b6e05e 2241 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2242 << " nothing to update any more: race_retries="
2243 << race_retries << " tid=" << tid << dendl;
2244 complete(std::move(p), 0);
2245 return;
2246 }
2247
2248 first_pp = false;
2249
2250 if (canceled) {
2251 if (race_retries >= MAX_RACE_RETRIES) {
b3b6e05e 2252 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2253 << " canceled too many times, giving up: tid="
2254 << tid << dendl;
2255 complete(std::move(p), -ECANCELED);
2256 return;
2257 }
b3b6e05e 2258 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2259 << " update canceled, retrying: race_retries="
2260 << race_retries << " tid=" << tid << dendl;
2261
2262 ++race_retries;
2263
2264 std::vector<fifo::journal_entry> new_processed;
2265 std::unique_lock l(fifo->m);
2266 for (auto& e : processed) {
39ae355f
TL
2267 if (fifo->info.journal.contains(e)) {
2268 new_processed.push_back(e);
f67539c2 2269 }
f67539c2
TL
2270 }
2271 processed = std::move(new_processed);
2272 }
2273
2274 std::unique_lock l(fifo->m);
2275 auto objv = fifo->info.version;
2276 if (new_tail > fifo->info.tail_part_num) {
2277 tail_part_num = new_tail;
2278 }
2279
2280 if (new_head > fifo->info.head_part_num) {
2281 head_part_num = new_head;
2282 }
2283
2284 if (new_max > fifo->info.max_push_part_num) {
2285 max_part_num = new_max;
2286 }
2287 l.unlock();
2288
2289 if (processed.empty() &&
2290 !tail_part_num &&
2291 !max_part_num) {
2292 /* nothing to update anymore */
b3b6e05e 2293 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2294 << " nothing to update any more: race_retries="
2295 << race_retries << " tid=" << tid << dendl;
2296 complete(std::move(p), 0);
2297 return;
2298 }
2299 state = pp_callback;
b3b6e05e 2300 fifo->_update_meta(dpp, fifo::update{}
f67539c2
TL
2301 .tail_part_num(tail_part_num)
2302 .head_part_num(head_part_num)
2303 .max_push_part_num(max_part_num)
2304 .journal_entries_rm(processed),
2305 objv, &this->canceled, tid, call(std::move(p)));
2306 return;
2307 }
2308
2309 JournalProcessor(const JournalProcessor&) = delete;
2310 JournalProcessor& operator =(const JournalProcessor&) = delete;
2311 JournalProcessor(JournalProcessor&&) = delete;
2312 JournalProcessor& operator =(JournalProcessor&&) = delete;
2313
b3b6e05e
TL
2314 void process(const DoutPrefixProvider *dpp, Ptr&& p) {
2315 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2316 << " entering: tid=" << tid << dendl;
2317 while (iter != journal.end()) {
b3b6e05e 2318 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2319 << " processing entry: entry=" << *iter
2320 << " tid=" << tid << dendl;
39ae355f 2321 const auto entry = *iter;
f67539c2
TL
2322 switch (entry.op) {
2323 case fifo::journal_entry::Op::create:
39ae355f 2324 create_part(dpp, std::move(p), entry.part_num);
f67539c2
TL
2325 return;
2326 case fifo::journal_entry::Op::set_head:
2327 if (entry.part_num > new_head) {
2328 new_head = entry.part_num;
2329 }
2330 processed.push_back(entry);
2331 ++iter;
2332 continue;
2333 case fifo::journal_entry::Op::remove:
39ae355f 2334 remove_part(dpp, std::move(p), entry.part_num);
f67539c2
TL
2335 return;
2336 default:
20effc67 2337 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2338 << " unknown journaled op: entry=" << entry << " tid="
2339 << tid << dendl;
2340 complete(std::move(p), -EIO);
2341 return;
2342 }
2343 }
b3b6e05e 2344 postprocess(dpp, std::move(p));
f67539c2
TL
2345 return;
2346 }
2347
b3b6e05e
TL
2348 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
2349 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
f67539c2
TL
2350 << " entering: tid=" << tid << dendl;
2351 switch (state) {
2352 case entry_callback:
39ae355f 2353 finish_je(dpp, std::move(p), r, *iter);
f67539c2
TL
2354 return;
2355 case pp_callback:
2356 auto c = canceled;
2357 canceled = false;
b3b6e05e 2358 pp_run(dpp, std::move(p), r, c);
f67539c2
TL
2359 return;
2360 }
2361
2362 abort();
2363 }
2364
2365};
2366
b3b6e05e
TL
2367void FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c) {
2368 auto p = std::make_unique<JournalProcessor>(dpp, this, tid, c);
2369 p->process(dpp, std::move(p));
f67539c2
TL
2370}
2371
2372struct Lister : Completion<Lister> {
2373 FIFO* f;
2374 std::vector<list_entry> result;
2375 bool more = false;
2376 std::int64_t part_num;
2377 std::uint64_t ofs;
2378 int max_entries;
2379 int r_out = 0;
2380 std::vector<fifo::part_list_entry> entries;
2381 bool part_more = false;
2382 bool part_full = false;
2383 std::vector<list_entry>* entries_out;
2384 bool* more_out;
2385 std::uint64_t tid;
2386
2387 bool read = false;
2388
2389 void complete(Ptr&& p, int r) {
2390 if (r >= 0) {
2391 if (more_out) *more_out = more;
2392 if (entries_out) *entries_out = std::move(result);
2393 }
2394 Completion::complete(std::move(p), r);
2395 }
2396
2397public:
b3b6e05e 2398 Lister(const DoutPrefixProvider *dpp, FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries,
f67539c2
TL
2399 std::vector<list_entry>* entries_out, bool* more_out,
2400 std::uint64_t tid, lr::AioCompletion* super)
b3b6e05e 2401 : Completion(dpp, super), f(f), part_num(part_num), ofs(ofs), max_entries(max_entries),
f67539c2
TL
2402 entries_out(entries_out), more_out(more_out), tid(tid) {
2403 result.reserve(max_entries);
2404 }
2405
2406 Lister(const Lister&) = delete;
2407 Lister& operator =(const Lister&) = delete;
2408 Lister(Lister&&) = delete;
2409 Lister& operator =(Lister&&) = delete;
2410
b3b6e05e 2411 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
2412 if (read)
2413 handle_read(std::move(p), r);
2414 else
b3b6e05e 2415 handle_list(dpp, std::move(p), r);
f67539c2
TL
2416 }
2417
2418 void list(Ptr&& p) {
2419 if (max_entries > 0) {
2420 part_more = false;
2421 part_full = false;
2422 entries.clear();
2423
2424 std::unique_lock l(f->m);
2425 auto part_oid = f->info.part_oid(part_num);
2426 l.unlock();
2427
2428 read = false;
39ae355f
TL
2429 auto op = list_part(f->cct, ofs, max_entries, &r_out,
2430 &entries, &part_more, &part_full, tid);
f67539c2
TL
2431 f->ioctx.aio_operate(part_oid, call(std::move(p)), &op, nullptr);
2432 } else {
2433 complete(std::move(p), 0);
2434 }
2435 }
2436
2437 void handle_read(Ptr&& p, int r) {
2438 read = false;
2439 if (r >= 0) r = r_out;
2440 r_out = 0;
2441
2442 if (r < 0) {
2443 complete(std::move(p), r);
2444 return;
2445 }
2446
2447 if (part_num < f->info.tail_part_num) {
2448 /* raced with trim? restart */
2449 max_entries += result.size();
2450 result.clear();
2451 part_num = f->info.tail_part_num;
2452 ofs = 0;
2453 list(std::move(p));
2454 return;
2455 }
2456 /* assuming part was not written yet, so end of data */
2457 more = false;
2458 complete(std::move(p), 0);
2459 return;
2460 }
2461
b3b6e05e 2462 void handle_list(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
2463 if (r >= 0) r = r_out;
2464 r_out = 0;
2465 std::unique_lock l(f->m);
2466 auto part_oid = f->info.part_oid(part_num);
2467 l.unlock();
2468 if (r == -ENOENT) {
2469 read = true;
b3b6e05e 2470 f->read_meta(dpp, tid, call(std::move(p)));
f67539c2
TL
2471 return;
2472 }
2473 if (r < 0) {
2474 complete(std::move(p), r);
2475 return;
2476 }
2477
2478 more = part_full || part_more;
2479 for (auto& entry : entries) {
2480 list_entry e;
2481 e.data = std::move(entry.data);
2482 e.marker = marker{part_num, entry.ofs}.to_string();
2483 e.mtime = entry.mtime;
2484 result.push_back(std::move(e));
2485 }
2486 max_entries -= entries.size();
2487 entries.clear();
2488 if (max_entries > 0 && part_more) {
2489 list(std::move(p));
2490 return;
2491 }
2492
2493 if (!part_full) { /* head part is not full */
2494 complete(std::move(p), 0);
2495 return;
2496 }
2497 ++part_num;
2498 ofs = 0;
2499 list(std::move(p));
2500 }
2501};
2502
b3b6e05e 2503void FIFO::list(const DoutPrefixProvider *dpp, int max_entries,
f67539c2
TL
2504 std::optional<std::string_view> markstr,
2505 std::vector<list_entry>* out,
2506 bool* more,
2507 lr::AioCompletion* c) {
2508 std::unique_lock l(m);
2509 auto tid = ++next_tid;
2510 std::int64_t part_num = info.tail_part_num;
2511 l.unlock();
2512 std::uint64_t ofs = 0;
2513 std::optional<::rgw::cls::fifo::marker> marker;
2514
2515 if (markstr) {
2516 marker = to_marker(*markstr);
2517 if (marker) {
2518 part_num = marker->num;
2519 ofs = marker->ofs;
2520 }
2521 }
2522
b3b6e05e 2523 auto ls = std::make_unique<Lister>(dpp, this, part_num, ofs, max_entries, out,
f67539c2
TL
2524 more, tid, c);
2525 if (markstr && !marker) {
2526 auto l = ls.get();
2527 l->complete(std::move(ls), -EINVAL);
2528 } else {
2529 ls->list(std::move(ls));
2530 }
2531}
2532}