]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/cls_fifo_legacy.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rgw / cls_fifo_legacy.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2020 Red Hat <contact@redhat.com>
7 * Author: Adam C. Emerson
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16#include <cstdint>
17#include <numeric>
18#include <optional>
19#include <string_view>
20
21#undef FMT_HEADER_ONLY
22#define FMT_HEADER_ONLY 1
23#include <fmt/format.h>
24
25#include "include/rados/librados.hpp"
26
27#include "include/buffer.h"
28
29#include "common/async/yield_context.h"
30#include "common/random_string.h"
31
32#include "cls/fifo/cls_fifo_types.h"
33#include "cls/fifo/cls_fifo_ops.h"
34
35#include "cls_fifo_legacy.h"
36
37namespace rgw::cls::fifo {
38static constexpr auto dout_subsys = ceph_subsys_objclass;
39namespace cb = ceph::buffer;
40namespace fifo = rados::cls::fifo;
41
42using ceph::from_error_code;
43
44inline constexpr auto MAX_RACE_RETRIES = 10;
45
46void create_meta(lr::ObjectWriteOperation* op,
47 std::string_view id,
48 std::optional<fifo::objv> objv,
49 std::optional<std::string_view> oid_prefix,
50 bool exclusive,
51 std::uint64_t max_part_size,
52 std::uint64_t max_entry_size)
53{
54 fifo::op::create_meta cm;
55
56 cm.id = id;
57 cm.version = objv;
58 cm.oid_prefix = oid_prefix;
59 cm.max_part_size = max_part_size;
60 cm.max_entry_size = max_entry_size;
61 cm.exclusive = exclusive;
62
63 cb::list in;
64 encode(cm, in);
65 op->exec(fifo::op::CLASS, fifo::op::CREATE_META, in);
66}
67
68int get_meta(lr::IoCtx& ioctx, const std::string& oid,
69 std::optional<fifo::objv> objv, fifo::info* info,
70 std::uint32_t* part_header_size,
71 std::uint32_t* part_entry_overhead,
72 uint64_t tid, optional_yield y,
73 bool probe)
74{
75 lr::ObjectReadOperation op;
76 fifo::op::get_meta gm;
77 gm.version = objv;
78 cb::list in;
79 encode(gm, in);
80 cb::list bl;
81
82 op.exec(fifo::op::CLASS, fifo::op::GET_META, in,
83 &bl, nullptr);
84 auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, y);
85 if (r >= 0) try {
86 fifo::op::get_meta_reply reply;
87 auto iter = bl.cbegin();
88 decode(reply, iter);
89 if (info) *info = std::move(reply.info);
90 if (part_header_size) *part_header_size = reply.part_header_size;
91 if (part_entry_overhead)
92 *part_entry_overhead = reply.part_entry_overhead;
93 } catch (const cb::error& err) {
94 lderr(static_cast<CephContext*>(ioctx.cct()))
95 << __PRETTY_FUNCTION__ << ":" << __LINE__
96 << " decode failed: " << err.what()
97 << " tid=" << tid << dendl;
98 r = from_error_code(err.code());
99 } else if (!(probe && (r == -ENOENT || r == -ENODATA))) {
100 lderr(static_cast<CephContext*>(ioctx.cct()))
101 << __PRETTY_FUNCTION__ << ":" << __LINE__
102 << " fifo::op::GET_META failed r=" << r << " tid=" << tid
103 << dendl;
104 }
105 return r;
106};
107
108namespace {
109void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv,
110 const fifo::update& update)
111{
112 fifo::op::update_meta um;
113
114 um.version = objv;
115 um.tail_part_num = update.tail_part_num();
116 um.head_part_num = update.head_part_num();
117 um.min_push_part_num = update.min_push_part_num();
118 um.max_push_part_num = update.max_push_part_num();
119 um.journal_entries_add = std::move(update).journal_entries_add();
120 um.journal_entries_rm = std::move(update).journal_entries_rm();
121
122 cb::list in;
123 encode(um, in);
124 op->exec(fifo::op::CLASS, fifo::op::UPDATE_META, in);
125}
126
127void part_init(lr::ObjectWriteOperation* op, std::string_view tag,
128 fifo::data_params params)
129{
130 fifo::op::init_part ip;
131
132 ip.tag = tag;
133 ip.params = params;
134
135 cb::list in;
136 encode(ip, in);
137 op->exec(fifo::op::CLASS, fifo::op::INIT_PART, in);
138}
139
140int push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
141 std::deque<cb::list> data_bufs, std::uint64_t tid,
142 optional_yield y)
143{
144 lr::ObjectWriteOperation op;
145 fifo::op::push_part pp;
146
147 pp.tag = tag;
148 pp.data_bufs = data_bufs;
149 pp.total_len = 0;
150
151 for (const auto& bl : data_bufs)
152 pp.total_len += bl.length();
153
154 cb::list in;
155 encode(pp, in);
156 auto retval = 0;
157 op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in, nullptr, &retval);
158 auto r = rgw_rados_operate(ioctx, oid, &op, y, lr::OPERATION_RETURNVEC);
159 if (r < 0) {
160 lderr(static_cast<CephContext*>(ioctx.cct()))
161 << __PRETTY_FUNCTION__ << ":" << __LINE__
162 << " fifo::op::PUSH_PART failed r=" << r
163 << " tid=" << tid << dendl;
164 return r;
165 }
166 if (retval < 0) {
167 lderr(static_cast<CephContext*>(ioctx.cct()))
168 << __PRETTY_FUNCTION__ << ":" << __LINE__
169 << " error handling response retval=" << retval
170 << " tid=" << tid << dendl;
171 }
172 return retval;
173}
174
175void push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
176 std::deque<cb::list> data_bufs, std::uint64_t tid,
177 lr::AioCompletion* c)
178{
179 lr::ObjectWriteOperation op;
180 fifo::op::push_part pp;
181
182 pp.tag = tag;
183 pp.data_bufs = data_bufs;
184 pp.total_len = 0;
185
186 for (const auto& bl : data_bufs)
187 pp.total_len += bl.length();
188
189 cb::list in;
190 encode(pp, in);
191 op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in);
192 auto r = ioctx.aio_operate(oid, c, &op, lr::OPERATION_RETURNVEC);
193 ceph_assert(r >= 0);
194}
195
196void trim_part(lr::ObjectWriteOperation* op,
197 std::optional<std::string_view> tag,
198 std::uint64_t ofs, bool exclusive)
199{
200 fifo::op::trim_part tp;
201
202 tp.tag = tag;
203 tp.ofs = ofs;
204 tp.exclusive = exclusive;
205
206 cb::list in;
207 encode(tp, in);
208 op->exec(fifo::op::CLASS, fifo::op::TRIM_PART, in);
209}
210
211int list_part(lr::IoCtx& ioctx, const std::string& oid,
212 std::optional<std::string_view> tag, std::uint64_t ofs,
213 std::uint64_t max_entries,
214 std::vector<fifo::part_list_entry>* entries,
215 bool* more, bool* full_part, std::string* ptag,
216 std::uint64_t tid, optional_yield y)
217{
218 lr::ObjectReadOperation op;
219 fifo::op::list_part lp;
220
221 lp.tag = tag;
222 lp.ofs = ofs;
223 lp.max_entries = max_entries;
224
225 cb::list in;
226 encode(lp, in);
227 cb::list bl;
228 op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, &bl, nullptr);
229 auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, y);
230 if (r >= 0) try {
231 fifo::op::list_part_reply reply;
232 auto iter = bl.cbegin();
233 decode(reply, iter);
234 if (entries) *entries = std::move(reply.entries);
235 if (more) *more = reply.more;
236 if (full_part) *full_part = reply.full_part;
237 if (ptag) *ptag = reply.tag;
238 } catch (const cb::error& err) {
239 lderr(static_cast<CephContext*>(ioctx.cct()))
240 << __PRETTY_FUNCTION__ << ":" << __LINE__
241 << " decode failed: " << err.what()
242 << " tid=" << tid << dendl;
243 r = from_error_code(err.code());
244 } else if (r != -ENOENT) {
245 lderr(static_cast<CephContext*>(ioctx.cct()))
246 << __PRETTY_FUNCTION__ << ":" << __LINE__
247 << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid
248 << dendl;
249 }
250 return r;
251}
252
253struct list_entry_completion : public lr::ObjectOperationCompletion {
254 CephContext* cct;
255 int* r_out;
256 std::vector<fifo::part_list_entry>* entries;
257 bool* more;
258 bool* full_part;
259 std::string* ptag;
260 std::uint64_t tid;
261
262 list_entry_completion(CephContext* cct, int* r_out, std::vector<fifo::part_list_entry>* entries,
263 bool* more, bool* full_part, std::string* ptag,
264 std::uint64_t tid)
265 : cct(cct), r_out(r_out), entries(entries), more(more),
266 full_part(full_part), ptag(ptag), tid(tid) {}
267 virtual ~list_entry_completion() = default;
268 void handle_completion(int r, bufferlist& bl) override {
269 if (r >= 0) try {
270 fifo::op::list_part_reply reply;
271 auto iter = bl.cbegin();
272 decode(reply, iter);
273 if (entries) *entries = std::move(reply.entries);
274 if (more) *more = reply.more;
275 if (full_part) *full_part = reply.full_part;
276 if (ptag) *ptag = reply.tag;
277 } catch (const cb::error& err) {
278 lderr(cct)
279 << __PRETTY_FUNCTION__ << ":" << __LINE__
280 << " decode failed: " << err.what()
281 << " tid=" << tid << dendl;
282 r = from_error_code(err.code());
283 } else if (r < 0) {
284 lderr(cct)
285 << __PRETTY_FUNCTION__ << ":" << __LINE__
286 << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid
287 << dendl;
288 }
289 if (r_out) *r_out = r;
290 }
291};
292
293lr::ObjectReadOperation list_part(CephContext* cct,
294 std::optional<std::string_view> tag,
295 std::uint64_t ofs,
296 std::uint64_t max_entries,
297 int* r_out,
298 std::vector<fifo::part_list_entry>* entries,
299 bool* more, bool* full_part,
300 std::string* ptag, std::uint64_t tid)
301{
302 lr::ObjectReadOperation op;
303 fifo::op::list_part lp;
304
305 lp.tag = tag;
306 lp.ofs = ofs;
307 lp.max_entries = max_entries;
308
309 cb::list in;
310 encode(lp, in);
311 op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in,
312 new list_entry_completion(cct, r_out, entries, more, full_part,
313 ptag, tid));
314 return op;
315}
316
317int get_part_info(lr::IoCtx& ioctx, const std::string& oid,
318 fifo::part_header* header,
319 std::uint64_t tid, optional_yield y)
320{
321 lr::ObjectReadOperation op;
322 fifo::op::get_part_info gpi;
323
324 cb::list in;
325 cb::list bl;
326 encode(gpi, in);
327 op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, &bl, nullptr);
328 auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, y);
329 if (r >= 0) try {
330 fifo::op::get_part_info_reply reply;
331 auto iter = bl.cbegin();
332 decode(reply, iter);
333 if (header) *header = std::move(reply.header);
334 } catch (const cb::error& err) {
335 lderr(static_cast<CephContext*>(ioctx.cct()))
336 << __PRETTY_FUNCTION__ << ":" << __LINE__
337 << " decode failed: " << err.what()
338 << " tid=" << tid << dendl;
339 r = from_error_code(err.code());
340 } else {
341 lderr(static_cast<CephContext*>(ioctx.cct()))
342 << __PRETTY_FUNCTION__ << ":" << __LINE__
343 << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid
344 << dendl;
345 }
346 return r;
347}
348
349struct partinfo_completion : public lr::ObjectOperationCompletion {
350 CephContext* cct;
351 int* rp;
352 fifo::part_header* h;
353 std::uint64_t tid;
354 partinfo_completion(CephContext* cct, int* rp, fifo::part_header* h,
355 std::uint64_t tid) :
356 cct(cct), rp(rp), h(h), tid(tid) {
357 }
358 virtual ~partinfo_completion() = default;
359 void handle_completion(int r, bufferlist& bl) override {
360 if (r >= 0) try {
361 fifo::op::get_part_info_reply reply;
362 auto iter = bl.cbegin();
363 decode(reply, iter);
364 if (h) *h = std::move(reply.header);
365 } catch (const cb::error& err) {
366 r = from_error_code(err.code());
367 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
368 << " decode failed: " << err.what()
369 << " tid=" << tid << dendl;
370 } else {
371 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
372 << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid
373 << dendl;
374 }
375 if (rp) {
376 *rp = r;
377 }
378 }
379};
380
381lr::ObjectReadOperation get_part_info(CephContext* cct,
382 fifo::part_header* header,
383 std::uint64_t tid, int* r = 0)
384{
385 lr::ObjectReadOperation op;
386 fifo::op::get_part_info gpi;
387
388 cb::list in;
389 cb::list bl;
390 encode(gpi, in);
391 op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in,
392 new partinfo_completion(cct, r, header, tid));
393 return op;
394}
395}
396
397std::optional<marker> FIFO::to_marker(std::string_view s)
398{
399 marker m;
400 if (s.empty()) {
401 m.num = info.tail_part_num;
402 m.ofs = 0;
403 return m;
404 }
405
406 auto pos = s.find(':');
407 if (pos == string::npos) {
408 return std::nullopt;
409 }
410
411 auto num = s.substr(0, pos);
412 auto ofs = s.substr(pos + 1);
413
414 auto n = ceph::parse<decltype(m.num)>(num);
415 if (!n) {
416 return std::nullopt;
417 }
418 m.num = *n;
419 auto o = ceph::parse<decltype(m.ofs)>(ofs);
420 if (!o) {
421 return std::nullopt;
422 }
423 m.ofs = *o;
424 return m;
425}
426
427std::string FIFO::generate_tag() const
428{
429 static constexpr auto HEADER_TAG_SIZE = 16;
430 return gen_rand_alphanumeric_plain(static_cast<CephContext*>(ioctx.cct()),
431 HEADER_TAG_SIZE);
432}
433
434
435int FIFO::apply_update(fifo::info* info,
436 const fifo::objv& objv,
437 const fifo::update& update,
438 std::uint64_t tid)
439{
440 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
441 << " entering: tid=" << tid << dendl;
442 std::unique_lock l(m);
443 if (objv != info->version) {
444 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
445 << " version mismatch, canceling: tid=" << tid << dendl;
446 return -ECANCELED;
447 }
448 auto err = info->apply_update(update);
449 if (err) {
450 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
451 << " error applying update: " << *err << " tid=" << tid << dendl;
452 return -ECANCELED;
453 }
454
455 ++info->version.ver;
456
457 return {};
458}
459
460int FIFO::_update_meta(const fifo::update& update,
461 fifo::objv version, bool* pcanceled,
462 std::uint64_t tid, optional_yield y)
463{
464 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
465 << " entering: tid=" << tid << dendl;
466 lr::ObjectWriteOperation op;
467 bool canceled = false;
468 update_meta(&op, info.version, update);
469 auto r = rgw_rados_operate(ioctx, oid, &op, y);
470 if (r >= 0 || r == -ECANCELED) {
471 canceled = (r == -ECANCELED);
472 if (!canceled) {
473 r = apply_update(&info, version, update, tid);
474 if (r < 0) canceled = true;
475 }
476 if (canceled) {
477 r = read_meta(tid, y);
478 canceled = r < 0 ? false : true;
479 }
480 }
481 if (pcanceled) *pcanceled = canceled;
482 if (canceled) {
483 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
484 << " canceled: tid=" << tid << dendl;
485 }
486 if (r < 0) {
487 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
488 << " returning error: r=" << r << " tid=" << tid << dendl;
489 }
490 return r;
491}
492
493struct Updater : public Completion<Updater> {
494 FIFO* fifo;
495 fifo::update update;
496 fifo::objv version;
497 bool reread = false;
498 bool* pcanceled = nullptr;
499 std::uint64_t tid;
500 Updater(FIFO* fifo, lr::AioCompletion* super,
501 const fifo::update& update, fifo::objv version,
502 bool* pcanceled, std::uint64_t tid)
503 : Completion(super), fifo(fifo), update(update), version(version),
504 pcanceled(pcanceled) {}
505
506 void handle(Ptr&& p, int r) {
507 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
508 << " entering: tid=" << tid << dendl;
509 if (reread)
510 handle_reread(std::move(p), r);
511 else
512 handle_update(std::move(p), r);
513 }
514
515 void handle_update(Ptr&& p, int r) {
516 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
517 << " handling async update_meta: tid="
518 << tid << dendl;
519 if (r < 0 && r != -ECANCELED) {
520 lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
521 << " update failed: r=" << r << " tid=" << tid << dendl;
522 complete(std::move(p), r);
523 return;
524 }
525 bool canceled = (r == -ECANCELED);
526 if (!canceled) {
527 int r = fifo->apply_update(&fifo->info, version, update, tid);
528 if (r < 0) {
529 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
530 << " update failed, marking canceled: r=" << r
531 << " tid=" << tid << dendl;
532 canceled = true;
533 }
534 }
535 if (canceled) {
536 reread = true;
537 fifo->read_meta(tid, call(std::move(p)));
538 return;
539 }
540 if (pcanceled)
541 *pcanceled = false;
542 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
543 << " completing: tid=" << tid << dendl;
544 complete(std::move(p), 0);
545 }
546
547 void handle_reread(Ptr&& p, int r) {
548 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
549 << " handling async read_meta: tid="
550 << tid << dendl;
551 if (r < 0 && pcanceled) {
552 *pcanceled = false;
553 } else if (r >= 0 && pcanceled) {
554 *pcanceled = true;
555 }
556 if (r < 0) {
557 lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
558 << " failed dispatching read_meta: r=" << r << " tid="
559 << tid << dendl;
560 } else {
561 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
562 << " completing: tid=" << tid << dendl;
563 }
564 complete(std::move(p), r);
565 }
566};
567
568void FIFO::_update_meta(const fifo::update& update,
569 fifo::objv version, bool* pcanceled,
570 std::uint64_t tid, lr::AioCompletion* c)
571{
572 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
573 << " entering: tid=" << tid << dendl;
574 lr::ObjectWriteOperation op;
575 update_meta(&op, info.version, update);
576 auto updater = std::make_unique<Updater>(this, c, update, version, pcanceled,
577 tid);
578 auto r = ioctx.aio_operate(oid, Updater::call(std::move(updater)), &op);
579 assert(r >= 0);
580}
581
582int FIFO::create_part(int64_t part_num, std::string_view tag, std::uint64_t tid,
583 optional_yield y)
584{
585 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
586 << " entering: tid=" << tid << dendl;
587 lr::ObjectWriteOperation op;
588 op.create(false); /* We don't need exclusivity, part_init ensures
589 we're creating from the same journal entry. */
590 std::unique_lock l(m);
591 part_init(&op, tag, info.params);
592 auto oid = info.part_oid(part_num);
593 l.unlock();
594 auto r = rgw_rados_operate(ioctx, oid, &op, y);
595 if (r < 0) {
596 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
597 << " part_init failed: r=" << r << " tid="
598 << tid << dendl;
599 }
600 return r;
601}
602
603int FIFO::remove_part(int64_t part_num, std::string_view tag, std::uint64_t tid,
604 optional_yield y)
605{
606 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
607 << " entering: tid=" << tid << dendl;
608 lr::ObjectWriteOperation op;
609 op.remove();
610 std::unique_lock l(m);
611 auto oid = info.part_oid(part_num);
612 l.unlock();
613 auto r = rgw_rados_operate(ioctx, oid, &op, y);
614 if (r < 0) {
615 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
616 << " remove failed: r=" << r << " tid="
617 << tid << dendl;
618 }
619 return r;
620}
621
622int FIFO::process_journal(std::uint64_t tid, optional_yield y)
623{
624 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
625 << " entering: tid=" << tid << dendl;
626 std::vector<fifo::journal_entry> processed;
627
628 std::unique_lock l(m);
629 auto tmpjournal = info.journal;
630 auto new_tail = info.tail_part_num;
631 auto new_head = info.head_part_num;
632 auto new_max = info.max_push_part_num;
633 l.unlock();
634
635 int r = 0;
636 for (auto& [n, entry] : tmpjournal) {
637 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
638 << " processing entry: entry=" << entry << " tid=" << tid
639 << dendl;
640 switch (entry.op) {
641 case fifo::journal_entry::Op::create:
642 r = create_part(entry.part_num, entry.part_tag, tid, y);
643 if (entry.part_num > new_max) {
644 new_max = entry.part_num;
645 }
646 break;
647 case fifo::journal_entry::Op::set_head:
648 r = 0;
649 if (entry.part_num > new_head) {
650 new_head = entry.part_num;
651 }
652 break;
653 case fifo::journal_entry::Op::remove:
654 r = remove_part(entry.part_num, entry.part_tag, tid, y);
655 if (r == -ENOENT) r = 0;
656 if (entry.part_num >= new_tail) {
657 new_tail = entry.part_num + 1;
658 }
659 break;
660 default:
661 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
662 << " unknown journaled op: entry=" << entry << " tid="
663 << tid << dendl;
664 return -EIO;
665 }
666
667 if (r < 0) {
668 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
669 << " processing entry failed: entry=" << entry
670 << " r=" << r << " tid=" << tid << dendl;
671 return -r;
672 }
673
674 processed.push_back(std::move(entry));
675 }
676
677 // Postprocess
678 bool canceled = true;
679
680 for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
681 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
682 << " postprocessing: i=" << i << " tid=" << tid << dendl;
683
684 std::optional<int64_t> tail_part_num;
685 std::optional<int64_t> head_part_num;
686 std::optional<int64_t> max_part_num;
687
688 std::unique_lock l(m);
689 auto objv = info.version;
690 if (new_tail > tail_part_num) tail_part_num = new_tail;
691 if (new_head > info.head_part_num) head_part_num = new_head;
692 if (new_max > info.max_push_part_num) max_part_num = new_max;
693 l.unlock();
694
695 if (processed.empty() &&
696 !tail_part_num &&
697 !max_part_num) {
698 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
699 << " nothing to update any more: i=" << i << " tid="
700 << tid << dendl;
701 canceled = false;
702 break;
703 }
704 auto u = fifo::update().tail_part_num(tail_part_num)
705 .head_part_num(head_part_num).max_push_part_num(max_part_num)
706 .journal_entries_rm(processed);
707 r = _update_meta(u, objv, &canceled, tid, y);
708 if (r < 0) {
709 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
710 << " _update_meta failed: update=" << u
711 << " r=" << r << " tid=" << tid << dendl;
712 break;
713 }
714
715 if (canceled) {
716 std::vector<fifo::journal_entry> new_processed;
717 std::unique_lock l(m);
718 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
719 << " update canceled, retrying: i=" << i << " tid="
720 << tid << dendl;
721 for (auto& e : processed) {
722 auto jiter = info.journal.find(e.part_num);
723 /* journal entry was already processed */
724 if (jiter == info.journal.end() ||
725 !(jiter->second == e)) {
726 continue;
727 }
728 new_processed.push_back(e);
729 }
730 processed = std::move(new_processed);
731 }
732 }
733 if (r == 0 && canceled) {
734 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
735 << " canceled too many times, giving up: tid=" << tid << dendl;
736 r = -ECANCELED;
737 }
738 if (r < 0) {
739 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
740 << " failed, r=: " << r << " tid=" << tid << dendl;
741 }
742 return r;
743}
744
745int FIFO::_prepare_new_part(bool is_head, std::uint64_t tid, optional_yield y)
746{
747 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
748 << " entering: tid=" << tid << dendl;
749 std::unique_lock l(m);
750 std::vector jentries = { info.next_journal_entry(generate_tag()) };
751 if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
752 l.unlock();
753 ldout(cct, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
754 << " new part journaled, but not processed: tid="
755 << tid << dendl;
756 auto r = process_journal(tid, y);
757 if (r < 0) {
758 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
759 << " process_journal failed: r=" << r << " tid=" << tid << dendl;
760 }
761 return r;
762 }
763 std::int64_t new_head_part_num = info.head_part_num;
764 auto version = info.version;
765
766 if (is_head) {
767 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
768 << " needs new head: tid=" << tid << dendl;
769 auto new_head_jentry = jentries.front();
770 new_head_jentry.op = fifo::journal_entry::Op::set_head;
771 new_head_part_num = jentries.front().part_num;
772 jentries.push_back(std::move(new_head_jentry));
773 }
774 l.unlock();
775
776 int r = 0;
777 bool canceled = true;
778 for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
779 canceled = false;
780 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
781 << " updating metadata: i=" << i << " tid=" << tid << dendl;
782 auto u = fifo::update{}.journal_entries_add(jentries);
783 r = _update_meta(u, version, &canceled, tid, y);
784 if (r >= 0 && canceled) {
785 std::unique_lock l(m);
786 auto found = (info.journal.find(jentries.front().part_num) !=
787 info.journal.end());
788 if ((info.max_push_part_num >= jentries.front().part_num &&
789 info.head_part_num >= new_head_part_num)) {
790 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
791 << " raced, but journaled and processed: i=" << i
792 << " tid=" << tid << dendl;
793 return 0;
794 }
795 if (found) {
796 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
797 << " raced, journaled but not processed: i=" << i
798 << " tid=" << tid << dendl;
799 canceled = false;
800 }
801 l.unlock();
802 }
803 if (r < 0) {
804 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
805 << " _update_meta failed: update=" << u << " r=" << r
806 << " tid=" << tid << dendl;
807 return r;
808 }
809 }
810 if (canceled) {
811 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
812 << " canceled too many times, giving up: tid=" << tid << dendl;
813 return -ECANCELED;
814 }
815 r = process_journal(tid, y);
816 if (r < 0) {
817 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
818 << " process_journal failed: r=" << r << " tid=" << tid << dendl;
819 }
820 return r;
821}
822
823int FIFO::_prepare_new_head(std::uint64_t tid, optional_yield y)
824{
825 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
826 << " entering: tid=" << tid << dendl;
827 std::unique_lock l(m);
828 std::int64_t new_head_num = info.head_part_num + 1;
829 auto max_push_part_num = info.max_push_part_num;
830 auto version = info.version;
831 l.unlock();
832
833 int r = 0;
834 if (max_push_part_num < new_head_num) {
835 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
836 << " need new part: tid=" << tid << dendl;
837 r = _prepare_new_part(true, tid, y);
838 if (r < 0) {
839 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
840 << " _prepare_new_part failed: r=" << r
841 << " tid=" << tid << dendl;
842 return r;
843 }
844 std::unique_lock l(m);
845 if (info.max_push_part_num < new_head_num) {
846 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
847 << " inconsistency, push part less than head part: "
848 << " tid=" << tid << dendl;
849 return -EIO;
850 }
851 l.unlock();
852 return 0;
853 }
854
855 bool canceled = true;
856 for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
857 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
858 << " updating head: i=" << i << " tid=" << tid << dendl;
859 auto u = fifo::update{}.head_part_num(new_head_num);
860 r = _update_meta(u, version, &canceled, tid, y);
861 if (r < 0) {
862 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
863 << " _update_meta failed: update=" << u << " r=" << r
864 << " tid=" << tid << dendl;
865 return r;
866 }
867 std::unique_lock l(m);
868 auto head_part_num = info.head_part_num;
869 version = info.version;
870 l.unlock();
871 if (canceled && (head_part_num >= new_head_num)) {
872 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
873 << " raced, but completed by the other caller: i=" << i
874 << " tid=" << tid << dendl;
875 canceled = false;
876 }
877 }
878 if (canceled) {
879 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
880 << " canceled too many times, giving up: tid=" << tid << dendl;
881 return -ECANCELED;
882 }
883 return 0;
884}
885
886struct NewPartPreparer : public Completion<NewPartPreparer> {
887 FIFO* f;
888 std::vector<fifo::journal_entry> jentries;
889 int i = 0;
890 std::int64_t new_head_part_num;
891 bool canceled = false;
892 uint64_t tid;
893
894 NewPartPreparer(FIFO* f, lr::AioCompletion* super,
895 std::vector<fifo::journal_entry> jentries,
896 std::int64_t new_head_part_num,
897 std::uint64_t tid)
898 : Completion(super), f(f), jentries(std::move(jentries)),
899 new_head_part_num(new_head_part_num), tid(tid) {}
900
901 void handle(Ptr&& p, int r) {
902 ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
903 << " entering: tid=" << tid << dendl;
904 if (r < 0) {
905 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
906 << " _update_meta failed: r=" << r
907 << " tid=" << tid << dendl;
908 complete(std::move(p), r);
909 return;
910 }
911
912 if (canceled) {
913 std::unique_lock l(f->m);
914 auto iter = f->info.journal.find(jentries.front().part_num);
915 auto max_push_part_num = f->info.max_push_part_num;
916 auto head_part_num = f->info.head_part_num;
917 auto version = f->info.version;
918 auto found = (iter != f->info.journal.end());
919 l.unlock();
920 if ((max_push_part_num >= jentries.front().part_num &&
921 head_part_num >= new_head_part_num)) {
922 ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
923 << " raced, but journaled and processed: i=" << i
924 << " tid=" << tid << dendl;
925 complete(std::move(p), 0);
926 return;
927 }
928 if (i >= MAX_RACE_RETRIES) {
929 complete(std::move(p), -ECANCELED);
930 return;
931 }
932 if (!found) {
933 ++i;
934 f->_update_meta(fifo::update{}
935 .journal_entries_add(jentries),
936 version, &canceled, tid, call(std::move(p)));
937 return;
938 } else {
939 ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
940 << " raced, journaled but not processed: i=" << i
941 << " tid=" << tid << dendl;
942 canceled = false;
943 }
944 // Fall through. We still need to process the journal.
945 }
946 f->process_journal(tid, super());
947 return;
948 }
949};
950
951void FIFO::_prepare_new_part(bool is_head, std::uint64_t tid,
952 lr::AioCompletion* c)
953{
954 std::unique_lock l(m);
955 std::vector jentries = { info.next_journal_entry(generate_tag()) };
956 if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
957 l.unlock();
958 ldout(cct, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
959 << " new part journaled, but not processed: tid="
960 << tid << dendl;
961 process_journal(tid, c);
962 return;
963 }
964 std::int64_t new_head_part_num = info.head_part_num;
965 auto version = info.version;
966
967 if (is_head) {
968 auto new_head_jentry = jentries.front();
969 new_head_jentry.op = fifo::journal_entry::Op::set_head;
970 new_head_part_num = jentries.front().part_num;
971 jentries.push_back(std::move(new_head_jentry));
972 }
973 l.unlock();
974
975 auto n = std::make_unique<NewPartPreparer>(this, c, jentries,
976 new_head_part_num, tid);
977 auto np = n.get();
978 _update_meta(fifo::update{}.journal_entries_add(jentries), version,
979 &np->canceled, tid, NewPartPreparer::call(std::move(n)));
980}
981
982struct NewHeadPreparer : public Completion<NewHeadPreparer> {
983 FIFO* f;
984 int i = 0;
985 bool newpart;
986 std::int64_t new_head_num;
987 bool canceled = false;
988 std::uint64_t tid;
989
990 NewHeadPreparer(FIFO* f, lr::AioCompletion* super,
991 bool newpart, std::int64_t new_head_num, std::uint64_t tid)
992 : Completion(super), f(f), newpart(newpart), new_head_num(new_head_num),
993 tid(tid) {}
994
995 void handle(Ptr&& p, int r) {
996 if (newpart)
997 handle_newpart(std::move(p), r);
998 else
999 handle_update(std::move(p), r);
1000 }
1001
1002 void handle_newpart(Ptr&& p, int r) {
1003 if (r < 0) {
1004 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1005 << " _prepare_new_part failed: r=" << r
1006 << " tid=" << tid << dendl;
1007 complete(std::move(p), r);
1008 return;
1009 }
1010 std::unique_lock l(f->m);
1011 if (f->info.max_push_part_num < new_head_num) {
1012 l.unlock();
1013 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1014 << " _prepare_new_part failed: r=" << r
1015 << " tid=" << tid << dendl;
1016 complete(std::move(p), -EIO);
1017 } else {
1018 l.unlock();
1019 complete(std::move(p), 0);
1020 }
1021 }
1022
1023 void handle_update(Ptr&& p, int r) {
1024 std::unique_lock l(f->m);
1025 auto head_part_num = f->info.head_part_num;
1026 auto version = f->info.version;
1027 l.unlock();
1028
1029 if (r < 0) {
1030 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1031 << " _update_meta failed: r=" << r
1032 << " tid=" << tid << dendl;
1033 complete(std::move(p), r);
1034 return;
1035 }
1036 if (canceled) {
1037 if (i >= MAX_RACE_RETRIES) {
1038 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1039 << " canceled too many times, giving up: tid=" << tid << dendl;
1040 complete(std::move(p), -ECANCELED);
1041 return;
1042 }
1043
1044 // Raced, but there's still work to do!
1045 if (head_part_num < new_head_num) {
1046 canceled = false;
1047 ++i;
1048 ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1049 << " updating head: i=" << i << " tid=" << tid << dendl;
1050 f->_update_meta(fifo::update{}.head_part_num(new_head_num),
1051 version, &this->canceled, tid, call(std::move(p)));
1052 return;
1053 }
1054 }
1055 ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1056 << " succeeded : i=" << i << " tid=" << tid << dendl;
1057 complete(std::move(p), 0);
1058 return;
1059 }
1060};
1061
1062void FIFO::_prepare_new_head(std::uint64_t tid, lr::AioCompletion* c)
1063{
1064 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1065 << " entering: tid=" << tid << dendl;
1066 std::unique_lock l(m);
1067 int64_t new_head_num = info.head_part_num + 1;
1068 auto max_push_part_num = info.max_push_part_num;
1069 auto version = info.version;
1070 l.unlock();
1071
1072 if (max_push_part_num < new_head_num) {
1073 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1074 << " need new part: tid=" << tid << dendl;
1075 auto n = std::make_unique<NewHeadPreparer>(this, c, true, new_head_num,
1076 tid);
1077 _prepare_new_part(true, tid, NewHeadPreparer::call(std::move(n)));
1078 } else {
1079 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1080 << " updating head: tid=" << tid << dendl;
1081 auto n = std::make_unique<NewHeadPreparer>(this, c, false, new_head_num,
1082 tid);
1083 auto np = n.get();
1084 _update_meta(fifo::update{}.head_part_num(new_head_num), version,
1085 &np->canceled, tid, NewHeadPreparer::call(std::move(n)));
1086 }
1087}
1088
1089int FIFO::push_entries(const std::deque<cb::list>& data_bufs,
1090 std::uint64_t tid, optional_yield y)
1091{
1092 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1093 << " entering: tid=" << tid << dendl;
1094 std::unique_lock l(m);
1095 auto head_part_num = info.head_part_num;
1096 auto tag = info.head_tag;
1097 const auto part_oid = info.part_oid(head_part_num);
1098 l.unlock();
1099
1100 auto r = push_part(ioctx, part_oid, tag, data_bufs, tid, y);
1101 if (r < 0) {
1102 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1103 << " push_part failed: r=" << r << " tid=" << tid << dendl;
1104 }
1105 return r;
1106}
1107
1108void FIFO::push_entries(const std::deque<cb::list>& data_bufs,
1109 std::uint64_t tid, lr::AioCompletion* c)
1110{
1111 std::unique_lock l(m);
1112 auto head_part_num = info.head_part_num;
1113 auto tag = info.head_tag;
1114 const auto part_oid = info.part_oid(head_part_num);
1115 l.unlock();
1116
1117 push_part(ioctx, part_oid, tag, data_bufs, tid, c);
1118}
1119
1120int FIFO::trim_part(int64_t part_num, uint64_t ofs,
1121 std::optional<std::string_view> tag,
1122 bool exclusive, std::uint64_t tid,
1123 optional_yield y)
1124{
1125 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1126 << " entering: tid=" << tid << dendl;
1127 lr::ObjectWriteOperation op;
1128 std::unique_lock l(m);
1129 const auto part_oid = info.part_oid(part_num);
1130 l.unlock();
1131 rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
1132 auto r = rgw_rados_operate(ioctx, part_oid, &op, y);
1133 if (r < 0) {
1134 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1135 << " trim_part failed: r=" << r << " tid=" << tid << dendl;
1136 }
1137 return 0;
1138}
1139
1140void FIFO::trim_part(int64_t part_num, uint64_t ofs,
1141 std::optional<std::string_view> tag,
1142 bool exclusive, std::uint64_t tid,
1143 lr::AioCompletion* c)
1144{
1145 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1146 << " entering: tid=" << tid << dendl;
1147 lr::ObjectWriteOperation op;
1148 std::unique_lock l(m);
1149 const auto part_oid = info.part_oid(part_num);
1150 l.unlock();
1151 rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
1152 auto r = ioctx.aio_operate(part_oid, c, &op);
1153 ceph_assert(r >= 0);
1154}
1155
1156int FIFO::open(lr::IoCtx ioctx, std::string oid, std::unique_ptr<FIFO>* fifo,
1157 optional_yield y, std::optional<fifo::objv> objv,
1158 bool probe)
1159{
1160 auto cct = static_cast<CephContext*>(ioctx.cct());
1161 ldout(cct, 20)
1162 << __PRETTY_FUNCTION__ << ":" << __LINE__
1163 << " entering" << dendl;
1164 fifo::info info;
1165 std::uint32_t size;
1166 std::uint32_t over;
1167 int r = get_meta(ioctx, std::move(oid), objv, &info, &size, &over, 0, y,
1168 probe);
1169 if (r < 0) {
1170 if (!(probe && (r == -ENOENT || r == -ENODATA))) {
1171 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1172 << " get_meta failed: r=" << r << dendl;
1173 }
1174 return r;
1175 }
1176 std::unique_ptr<FIFO> f(new FIFO(std::move(ioctx), oid));
1177 f->info = info;
1178 f->part_header_size = size;
1179 f->part_entry_overhead = over;
1180 // If there are journal entries, process them, in case
1181 // someone crashed mid-transaction.
1182 if (!info.journal.empty()) {
1183 ldout(cct, 20)
1184 << __PRETTY_FUNCTION__ << ":" << __LINE__
1185 << " processing leftover journal" << dendl;
1186 r = f->process_journal(0, y);
1187 if (r < 0) {
1188 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1189 << " process_journal failed: r=" << r << dendl;
1190 return r;
1191 }
1192 }
1193 *fifo = std::move(f);
1194 return 0;
1195}
1196
1197int FIFO::create(lr::IoCtx ioctx, std::string oid, std::unique_ptr<FIFO>* fifo,
1198 optional_yield y, std::optional<fifo::objv> objv,
1199 std::optional<std::string_view> oid_prefix,
1200 bool exclusive, std::uint64_t max_part_size,
1201 std::uint64_t max_entry_size)
1202{
1203 auto cct = static_cast<CephContext*>(ioctx.cct());
1204 ldout(cct, 20)
1205 << __PRETTY_FUNCTION__ << ":" << __LINE__
1206 << " entering" << dendl;
1207 lr::ObjectWriteOperation op;
1208 create_meta(&op, oid, objv, oid_prefix, exclusive, max_part_size,
1209 max_entry_size);
1210 auto r = rgw_rados_operate(ioctx, oid, &op, y);
1211 if (r < 0) {
1212 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1213 << " create_meta failed: r=" << r << dendl;
1214 return r;
1215 }
1216 r = open(std::move(ioctx), std::move(oid), fifo, y, objv);
1217 return r;
1218}
1219
1220int FIFO::read_meta(std::uint64_t tid, optional_yield y) {
1221 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1222 << " entering: tid=" << tid << dendl;
1223 fifo::info _info;
1224 std::uint32_t _phs;
1225 std::uint32_t _peo;
1226
1227 auto r = get_meta(ioctx, oid, nullopt, &_info, &_phs, &_peo, tid, y);
1228 if (r < 0) {
1229 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1230 << " get_meta failed: r=" << r << " tid=" << tid << dendl;
1231 return r;
1232 }
1233 std::unique_lock l(m);
1234 // We have a newer version already!
1235 if (_info.version.same_or_later(this->info.version)) {
1236 info = std::move(_info);
1237 part_header_size = _phs;
1238 part_entry_overhead = _peo;
1239 }
1240 return 0;
1241}
1242
1243int FIFO::read_meta(optional_yield y) {
1244 std::unique_lock l(m);
1245 auto tid = ++next_tid;
1246 l.unlock();
1247 return read_meta(tid, y);
1248}
1249
1250struct Reader : public Completion<Reader> {
1251 FIFO* fifo;
1252 cb::list bl;
1253 std::uint64_t tid;
1254 Reader(FIFO* fifo, lr::AioCompletion* super, std::uint64_t tid)
1255 : Completion(super), fifo(fifo), tid(tid) {}
1256
1257 void handle(Ptr&& p, int r) {
1258 auto cct = fifo->cct;
1259 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1260 << " entering: tid=" << tid << dendl;
1261 if (r >= 0) try {
1262 fifo::op::get_meta_reply reply;
1263 auto iter = bl.cbegin();
1264 decode(reply, iter);
1265 std::unique_lock l(fifo->m);
1266 if (reply.info.version.same_or_later(fifo->info.version)) {
1267 fifo->info = std::move(reply.info);
1268 fifo->part_header_size = reply.part_header_size;
1269 fifo->part_entry_overhead = reply.part_entry_overhead;
1270 }
1271 } catch (const cb::error& err) {
1272 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1273 << " failed to decode response err=" << err.what()
1274 << " tid=" << tid << dendl;
1275 r = from_error_code(err.code());
1276 } else {
1277 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1278 << " read_meta failed r=" << r
1279 << " tid=" << tid << dendl;
1280 }
1281 complete(std::move(p), r);
1282 }
1283};
1284
1285void FIFO::read_meta(std::uint64_t tid, lr::AioCompletion* c)
1286{
1287 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1288 << " entering: tid=" << tid << dendl;
1289 lr::ObjectReadOperation op;
1290 fifo::op::get_meta gm;
1291 cb::list in;
1292 encode(gm, in);
1293 auto reader = std::make_unique<Reader>(this, c, tid);
1294 auto rp = reader.get();
1295 auto r = ioctx.aio_exec(oid, Reader::call(std::move(reader)), fifo::op::CLASS,
1296 fifo::op::GET_META, in, &rp->bl);
1297 assert(r >= 0);
1298}
1299
1300const fifo::info& FIFO::meta() const {
1301 return info;
1302}
1303
1304std::pair<std::uint32_t, std::uint32_t> FIFO::get_part_layout_info() const {
1305 return {part_header_size, part_entry_overhead};
1306}
1307
1308int FIFO::push(const cb::list& bl, optional_yield y) {
1309 return push(std::vector{ bl }, y);
1310}
1311
1312void FIFO::push(const cb::list& bl, lr::AioCompletion* c) {
1313 push(std::vector{ bl }, c);
1314}
1315
1316int FIFO::push(const std::vector<cb::list>& data_bufs, optional_yield y)
1317{
1318 std::unique_lock l(m);
1319 auto tid = ++next_tid;
1320 auto max_entry_size = info.params.max_entry_size;
1321 auto need_new_head = info.need_new_head();
1322 l.unlock();
1323 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1324 << " entering: tid=" << tid << dendl;
1325 if (data_bufs.empty()) {
1326 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1327 << " empty push, returning success tid=" << tid << dendl;
1328 return 0;
1329 }
1330
1331 // Validate sizes
1332 for (const auto& bl : data_bufs) {
1333 if (bl.length() > max_entry_size) {
1334 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1335 << " entry bigger than max_entry_size tid=" << tid << dendl;
1336 return -E2BIG;
1337 }
1338 }
1339
1340 int r = 0;
1341 if (need_new_head) {
1342 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1343 << " need new head tid=" << tid << dendl;
1344 r = _prepare_new_head(tid, y);
1345 if (r < 0) {
1346 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1347 << " _prepare_new_head failed: r=" << r
1348 << " tid=" << tid << dendl;
1349 return r;
1350 }
1351 }
1352
1353 std::deque<cb::list> remaining(data_bufs.begin(), data_bufs.end());
1354 std::deque<cb::list> batch;
1355
1356 uint64_t batch_len = 0;
1357 auto retries = 0;
1358 bool canceled = true;
1359 while ((!remaining.empty() || !batch.empty()) &&
1360 (retries <= MAX_RACE_RETRIES)) {
1361 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1362 << " preparing push: remaining=" << remaining.size()
1363 << " batch=" << batch.size() << " retries=" << retries
1364 << " tid=" << tid << dendl;
1365 std::unique_lock l(m);
1366 auto max_part_size = info.params.max_part_size;
1367 auto overhead = part_entry_overhead;
1368 l.unlock();
1369
1370 while (!remaining.empty() &&
1371 (remaining.front().length() + batch_len <= max_part_size)) {
1372 /* We can send entries with data_len up to max_entry_size,
1373 however, we want to also account the overhead when
1374 dealing with multiple entries. Previous check doesn't
1375 account for overhead on purpose. */
1376 batch_len += remaining.front().length() + overhead;
1377 batch.push_back(std::move(remaining.front()));
1378 remaining.pop_front();
1379 }
1380 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1381 << " prepared push: remaining=" << remaining.size()
1382 << " batch=" << batch.size() << " retries=" << retries
1383 << " batch_len=" << batch_len
1384 << " tid=" << tid << dendl;
1385
1386 auto r = push_entries(batch, tid, y);
1387 if (r == -ERANGE) {
1388 canceled = true;
1389 ++retries;
1390 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1391 << " need new head tid=" << tid << dendl;
1392 r = _prepare_new_head(tid, y);
1393 if (r < 0) {
1394 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1395 << " prepare_new_head failed: r=" << r
1396 << " tid=" << tid << dendl;
1397 return r;
1398 }
1399 r = 0;
1400 continue;
1401 }
1402 if (r < 0) {
1403 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1404 << " push_entries failed: r=" << r
1405 << " tid=" << tid << dendl;
1406 return r;
1407 }
1408 // Made forward progress!
1409 canceled = false;
1410 retries = 0;
1411 batch_len = 0;
1412 if (static_cast<unsigned>(r) == batch.size()) {
1413 batch.clear();
1414 } else {
1415 batch.erase(batch.begin(), batch.begin() + r);
1416 for (const auto& b : batch) {
1417 batch_len += b.length() + part_entry_overhead;
1418 }
1419 }
1420 }
1421 if (canceled) {
1422 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1423 << " canceled too many times, giving up: tid=" << tid << dendl;
1424 return -ECANCELED;
1425 }
1426 return 0;
1427}
1428
1429struct Pusher : public Completion<Pusher> {
1430 FIFO* f;
1431 std::deque<cb::list> remaining;
1432 std::deque<cb::list> batch;
1433 int i = 0;
1434 std::uint64_t tid;
1435 bool new_heading = false;
1436
1437 void prep_then_push(Ptr&& p, const unsigned successes) {
1438 std::unique_lock l(f->m);
1439 auto max_part_size = f->info.params.max_part_size;
1440 auto part_entry_overhead = f->part_entry_overhead;
1441 l.unlock();
1442
1443 ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1444 << " preparing push: remaining=" << remaining.size()
1445 << " batch=" << batch.size() << " i=" << i
1446 << " tid=" << tid << dendl;
1447
1448 uint64_t batch_len = 0;
1449 if (successes > 0) {
1450 if (successes == batch.size()) {
1451 batch.clear();
1452 } else {
1453 batch.erase(batch.begin(), batch.begin() + successes);
1454 for (const auto& b : batch) {
1455 batch_len += b.length() + part_entry_overhead;
1456 }
1457 }
1458 }
1459
1460 if (batch.empty() && remaining.empty()) {
1461 complete(std::move(p), 0);
1462 return;
1463 }
1464
1465 while (!remaining.empty() &&
1466 (remaining.front().length() + batch_len <= max_part_size)) {
1467
1468 /* We can send entries with data_len up to max_entry_size,
1469 however, we want to also account the overhead when
1470 dealing with multiple entries. Previous check doesn't
1471 account for overhead on purpose. */
1472 batch_len += remaining.front().length() + part_entry_overhead;
1473 batch.push_back(std::move(remaining.front()));
1474 remaining.pop_front();
1475 }
1476 ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1477 << " prepared push: remaining=" << remaining.size()
1478 << " batch=" << batch.size() << " i=" << i
1479 << " batch_len=" << batch_len
1480 << " tid=" << tid << dendl;
1481 push(std::move(p));
1482 }
1483
1484 void push(Ptr&& p) {
1485 f->push_entries(batch, tid, call(std::move(p)));
1486 }
1487
1488 void new_head(Ptr&& p) {
1489 new_heading = true;
1490 f->_prepare_new_head(tid, call(std::move(p)));
1491 }
1492
1493 void handle(Ptr&& p, int r) {
1494 if (!new_heading) {
1495 if (r == -ERANGE) {
1496 ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1497 << " need new head tid=" << tid << dendl;
1498 new_head(std::move(p));
1499 return;
1500 }
1501 if (r < 0) {
1502 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1503 << " push_entries failed: r=" << r
1504 << " tid=" << tid << dendl;
1505 complete(std::move(p), r);
1506 return;
1507 }
1508 i = 0; // We've made forward progress, so reset the race counter!
1509 prep_then_push(std::move(p), r);
1510 } else {
1511 if (r < 0) {
1512 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1513 << " prepare_new_head failed: r=" << r
1514 << " tid=" << tid << dendl;
1515 complete(std::move(p), r);
1516 return;
1517 }
1518 new_heading = false;
1519 handle_new_head(std::move(p), r);
1520 }
1521 }
1522
1523 void handle_new_head(Ptr&& p, int r) {
1524 if (r == -ECANCELED) {
1525 if (p->i == MAX_RACE_RETRIES) {
1526 lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1527 << " canceled too many times, giving up: tid=" << tid << dendl;
1528 complete(std::move(p), -ECANCELED);
1529 return;
1530 }
1531 ++p->i;
1532 } else if (r) {
1533 complete(std::move(p), r);
1534 return;
1535 }
1536
1537 if (p->batch.empty()) {
1538 prep_then_push(std::move(p), 0);
1539 return;
1540 } else {
1541 push(std::move(p));
1542 return;
1543 }
1544 }
1545
1546 Pusher(FIFO* f, std::deque<cb::list>&& remaining,
1547 std::uint64_t tid, lr::AioCompletion* super)
1548 : Completion(super), f(f), remaining(std::move(remaining)),
1549 tid(tid) {}
1550};
1551
1552void FIFO::push(const std::vector<cb::list>& data_bufs,
1553 lr::AioCompletion* c)
1554{
1555 std::unique_lock l(m);
1556 auto tid = ++next_tid;
1557 auto max_entry_size = info.params.max_entry_size;
1558 auto need_new_head = info.need_new_head();
1559 l.unlock();
1560 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1561 << " entering: tid=" << tid << dendl;
1562 auto p = std::make_unique<Pusher>(this, std::deque<cb::list>(data_bufs.begin(), data_bufs.end()),
1563 tid, c);
1564 // Validate sizes
1565 for (const auto& bl : data_bufs) {
1566 if (bl.length() > max_entry_size) {
1567 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1568 << " entry bigger than max_entry_size tid=" << tid << dendl;
1569 Pusher::complete(std::move(p), -E2BIG);
1570 return;
1571 }
1572 }
1573
1574 if (data_bufs.empty() ) {
1575 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1576 << " empty push, returning success tid=" << tid << dendl;
1577 Pusher::complete(std::move(p), 0);
1578 return;
1579 }
1580
1581 if (need_new_head) {
1582 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1583 << " need new head tid=" << tid << dendl;
1584 p->new_head(std::move(p));
1585 } else {
1586 p->prep_then_push(std::move(p), 0);
1587 }
1588}
1589
1590int FIFO::list(int max_entries,
1591 std::optional<std::string_view> markstr,
1592 std::vector<list_entry>* presult, bool* pmore,
1593 optional_yield y)
1594{
1595 std::unique_lock l(m);
1596 auto tid = ++next_tid;
1597 std::int64_t part_num = info.tail_part_num;
1598 l.unlock();
1599 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1600 << " entering: tid=" << tid << dendl;
1601 std::uint64_t ofs = 0;
1602 if (markstr) {
1603 auto marker = to_marker(*markstr);
1604 if (!marker) {
1605 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1606 << " invalid marker string: " << markstr
1607 << " tid= "<< tid << dendl;
1608 return -EINVAL;
1609 }
1610 part_num = marker->num;
1611 ofs = marker->ofs;
1612 }
1613
1614 std::vector<list_entry> result;
1615 result.reserve(max_entries);
1616 bool more = false;
1617
1618 std::vector<fifo::part_list_entry> entries;
1619 int r = 0;
1620 while (max_entries > 0) {
1621 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1622 << " max_entries=" << max_entries << " tid=" << tid << dendl;
1623 bool part_more = false;
1624 bool part_full = false;
1625
1626 std::unique_lock l(m);
1627 auto part_oid = info.part_oid(part_num);
1628 l.unlock();
1629
1630 r = list_part(ioctx, part_oid, {}, ofs, max_entries, &entries,
1631 &part_more, &part_full, nullptr, tid, y);
1632 if (r == -ENOENT) {
1633 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1634 << " missing part, rereading metadata"
1635 << " tid= "<< tid << dendl;
1636 r = read_meta(tid, y);
1637 if (r < 0) {
1638 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1639 << " read_meta failed: r=" << r
1640 << " tid= "<< tid << dendl;
1641 return r;
1642 }
1643 if (part_num < info.tail_part_num) {
1644 /* raced with trim? restart */
1645 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1646 << " raced with trim, restarting: tid=" << tid << dendl;
1647 max_entries += result.size();
1648 result.clear();
1649 std::unique_lock l(m);
1650 part_num = info.tail_part_num;
1651 l.unlock();
1652 ofs = 0;
1653 continue;
1654 }
1655 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1656 << " assuming part was not written yet, so end of data: "
1657 << "tid=" << tid << dendl;
1658 more = false;
1659 r = 0;
1660 break;
1661 }
1662 if (r < 0) {
1663 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1664 << " list_entries failed: r=" << r
1665 << " tid= "<< tid << dendl;
1666 return r;
1667 }
1668 more = part_full || part_more;
1669 for (auto& entry : entries) {
1670 list_entry e;
1671 e.data = std::move(entry.data);
1672 e.marker = marker{part_num, entry.ofs}.to_string();
1673 e.mtime = entry.mtime;
1674 result.push_back(std::move(e));
1675 --max_entries;
1676 if (max_entries == 0)
1677 break;
1678 }
1679 entries.clear();
1680 if (max_entries > 0 &&
1681 part_more) {
1682 }
1683
1684 if (!part_full) {
1685 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1686 << " head part is not full, so we can assume we're done: "
1687 << "tid=" << tid << dendl;
1688 break;
1689 }
1690 if (!part_more) {
1691 ++part_num;
1692 ofs = 0;
1693 }
1694 }
1695 if (presult)
1696 *presult = std::move(result);
1697 if (pmore)
1698 *pmore = more;
1699 return 0;
1700}
1701
1702int FIFO::trim(std::string_view markstr, bool exclusive, optional_yield y)
1703{
1704 bool overshoot = false;
1705 auto marker = to_marker(markstr);
1706 if (!marker) {
1707 return -EINVAL;
1708 }
1709 auto part_num = marker->num;
1710 auto ofs = marker->ofs;
1711 std::unique_lock l(m);
1712 auto tid = ++next_tid;
1713 auto hn = info.head_part_num;
1714 const auto max_part_size = info.params.max_part_size;
1715 if (part_num > hn) {
1716 l.unlock();
1717 auto r = read_meta(tid, y);
1718 if (r < 0) {
1719 return r;
1720 }
1721 l.lock();
1722 auto hn = info.head_part_num;
1723 if (part_num > hn) {
1724 overshoot = true;
1725 part_num = hn;
1726 ofs = max_part_size;
1727 }
1728 }
1729 if (part_num < info.tail_part_num) {
1730 return -ENODATA;
1731 }
1732 auto pn = info.tail_part_num;
1733 l.unlock();
1734 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1735 << " entering: tid=" << tid << dendl;
1736
1737 int r = 0;
1738 while (pn < part_num) {
1739 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1740 << " pn=" << pn << " tid=" << tid << dendl;
1741 std::unique_lock l(m);
1742 l.unlock();
1743 r = trim_part(pn, max_part_size, std::nullopt, false, tid, y);
1744 if (r < 0 && r == -ENOENT) {
1745 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1746 << " trim_part failed: r=" << r
1747 << " tid= "<< tid << dendl;
1748 return r;
1749 }
1750 ++pn;
1751 }
1752 r = trim_part(part_num, ofs, std::nullopt, exclusive, tid, y);
1753 if (r < 0 && r != -ENOENT) {
1754 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1755 << " trim_part failed: r=" << r
1756 << " tid= "<< tid << dendl;
1757 return r;
1758 }
1759
1760 l.lock();
1761 auto tail_part_num = info.tail_part_num;
1762 auto objv = info.version;
1763 l.unlock();
1764 bool canceled = tail_part_num < part_num;
1765 int retries = 0;
1766 while ((tail_part_num < part_num) &&
1767 canceled &&
1768 (retries <= MAX_RACE_RETRIES)) {
1769 r = _update_meta(fifo::update{}.tail_part_num(part_num), objv, &canceled,
1770 tid, y);
1771 if (r < 0) {
1772 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1773 << " _update_meta failed: r=" << r
1774 << " tid= "<< tid << dendl;
1775 return r;
1776 }
1777 if (canceled) {
1778 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1779 << " canceled: retries=" << retries
1780 << " tid=" << tid << dendl;
1781 l.lock();
1782 tail_part_num = info.tail_part_num;
1783 objv = info.version;
1784 l.unlock();
1785 ++retries;
1786 }
1787 }
1788 if (canceled) {
1789 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1790 << " canceled too many times, giving up: tid=" << tid << dendl;
1791 return -EIO;
1792 }
1793 return overshoot ? -ENODATA : 0;
1794}
1795
1796struct Trimmer : public Completion<Trimmer> {
1797 FIFO* fifo;
1798 std::int64_t part_num;
1799 std::uint64_t ofs;
1800 std::int64_t pn;
1801 bool exclusive;
1802 std::uint64_t tid;
1803 bool update = false;
1804 bool reread = false;
1805 bool canceled = false;
1806 bool overshoot = false;
1807 int retries = 0;
1808
1809 Trimmer(FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn,
1810 bool exclusive, lr::AioCompletion* super, std::uint64_t tid)
1811 : Completion(super), fifo(fifo), part_num(part_num), ofs(ofs), pn(pn),
1812 exclusive(exclusive), tid(tid) {}
1813
1814 void handle(Ptr&& p, int r) {
1815 auto cct = fifo->cct;
1816 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1817 << " entering: tid=" << tid << dendl;
1818
1819 if (reread) {
1820 reread = false;
1821 if (r < 0) {
1822 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1823 << " read_meta failed: r="
1824 << r << " tid=" << tid << dendl;
1825 complete(std::move(p), r);
1826 return;
1827 }
1828 std::unique_lock l(fifo->m);
1829 auto hn = fifo->info.head_part_num;
1830 const auto max_part_size = fifo->info.params.max_part_size;
1831 const auto tail_part_num = fifo->info.tail_part_num;
1832 l.unlock();
1833 if (part_num > hn) {
1834 part_num = hn;
1835 ofs = max_part_size;
1836 overshoot = true;
1837 }
1838 if (part_num < tail_part_num) {
1839 complete(std::move(p), -ENODATA);
1840 return;
1841 }
1842 pn = tail_part_num;
1843 if (pn < part_num) {
1844 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1845 << " pn=" << pn << " tid=" << tid << dendl;
1846 fifo->trim_part(pn++, max_part_size, std::nullopt,
1847 false, tid, call(std::move(p)));
1848 } else {
1849 update = true;
1850 canceled = tail_part_num < part_num;
1851 fifo->trim_part(part_num, ofs, std::nullopt, exclusive, tid,
1852 call(std::move(p)));
1853 }
1854 return;
1855 }
1856
1857 if (r == -ENOENT) {
1858 r = 0;
1859 }
1860
1861 if (r < 0) {
1862 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1863 << (update ? " update_meta " : " trim ") << "failed: r="
1864 << r << " tid=" << tid << dendl;
1865 complete(std::move(p), r);
1866 return;
1867 }
1868
1869 if (!update) {
1870 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1871 << " handling preceding trim callback: tid=" << tid << dendl;
1872 retries = 0;
1873 if (pn < part_num) {
1874 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1875 << " pn=" << pn << " tid=" << tid << dendl;
1876 std::unique_lock l(fifo->m);
1877 const auto max_part_size = fifo->info.params.max_part_size;
1878 l.unlock();
1879 fifo->trim_part(pn++, max_part_size, std::nullopt,
1880 false, tid, call(std::move(p)));
1881 return;
1882 }
1883
1884 std::unique_lock l(fifo->m);
1885 const auto tail_part_num = fifo->info.tail_part_num;
1886 l.unlock();
1887 update = true;
1888 canceled = tail_part_num < part_num;
1889 fifo->trim_part(part_num, ofs, std::nullopt, exclusive, tid,
1890 call(std::move(p)));
1891 return;
1892 }
1893
1894 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1895 << " handling update-needed callback: tid=" << tid << dendl;
1896 std::unique_lock l(fifo->m);
1897 auto tail_part_num = fifo->info.tail_part_num;
1898 auto objv = fifo->info.version;
1899 l.unlock();
1900 if ((tail_part_num < part_num) &&
1901 canceled) {
1902 if (retries > MAX_RACE_RETRIES) {
1903 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1904 << " canceled too many times, giving up: tid=" << tid << dendl;
1905 complete(std::move(p), -EIO);
1906 return;
1907 }
1908 ++retries;
1909 fifo->_update_meta(fifo::update{}
1910 .tail_part_num(part_num), objv, &canceled,
1911 tid, call(std::move(p)));
1912 } else {
1913 complete(std::move(p), overshoot ? -ENODATA : 0);
1914 }
1915 }
1916};
1917
1918void FIFO::trim(std::string_view markstr, bool exclusive,
1919 lr::AioCompletion* c) {
1920 auto marker = to_marker(markstr);
1921 auto realmark = marker.value_or(::rgw::cls::fifo::marker{});
1922 std::unique_lock l(m);
1923 const auto hn = info.head_part_num;
1924 const auto max_part_size = info.params.max_part_size;
1925 const auto pn = info.tail_part_num;
1926 const auto part_oid = info.part_oid(pn);
1927 auto tid = ++next_tid;
1928 l.unlock();
1929 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1930 << " entering: tid=" << tid << dendl;
1931 auto trimmer = std::make_unique<Trimmer>(this, realmark.num, realmark.ofs,
1932 pn, exclusive, c, tid);
1933 if (!marker) {
1934 Trimmer::complete(std::move(trimmer), -EINVAL);
1935 return;
1936 }
1937 ++trimmer->pn;
1938 auto ofs = marker->ofs;
1939 if (marker->num > hn) {
1940 trimmer->reread = true;
1941 read_meta(tid, Trimmer::call(std::move(trimmer)));
1942 return;
1943 }
1944 if (pn < marker->num) {
1945 ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
1946 << " pn=" << pn << " tid=" << tid << dendl;
1947 ofs = max_part_size;
1948 } else {
1949 trimmer->update = true;
1950 }
1951 trim_part(pn, ofs, std::nullopt, exclusive,
1952 tid, Trimmer::call(std::move(trimmer)));
1953}
1954
1955int FIFO::get_part_info(int64_t part_num,
1956 fifo::part_header* header,
1957 optional_yield y)
1958{
1959 std::unique_lock l(m);
1960 const auto part_oid = info.part_oid(part_num);
1961 auto tid = ++next_tid;
1962 l.unlock();
1963 auto r = rgw::cls::fifo::get_part_info(ioctx, part_oid, header, tid, y);
1964 if (r < 0) {
1965 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1966 << " get_part_info failed: r="
1967 << r << " tid=" << tid << dendl;
1968 }
1969 return r;
1970}
1971
1972void FIFO::get_part_info(int64_t part_num,
1973 fifo::part_header* header,
1974 lr::AioCompletion* c)
1975{
1976 std::unique_lock l(m);
1977 const auto part_oid = info.part_oid(part_num);
1978 auto tid = ++next_tid;
1979 l.unlock();
1980 auto op = rgw::cls::fifo::get_part_info(cct, header, tid);
1981 auto r = ioctx.aio_operate(part_oid, c, &op, nullptr);
1982 ceph_assert(r >= 0);
1983}
1984
1985struct InfoGetter : Completion<InfoGetter> {
1986 FIFO* fifo;
1987 fifo::part_header header;
1988 fu2::function<void(int r, fifo::part_header&&)> f;
1989 std::uint64_t tid;
1990 bool headerread = false;
1991
1992 InfoGetter(FIFO* fifo, fu2::function<void(int r, fifo::part_header&&)> f,
1993 std::uint64_t tid, lr::AioCompletion* super)
1994 : Completion(super), fifo(fifo), f(std::move(f)), tid(tid) {}
1995 void handle(Ptr&& p, int r) {
1996 if (!headerread) {
1997 if (r < 0) {
1998 lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
1999 << " read_meta failed: r="
2000 << r << " tid=" << tid << dendl;
2001 if (f)
2002 f(r, {});
2003 complete(std::move(p), r);
2004 return;
2005 }
2006
2007 auto info = fifo->meta();
2008 auto hpn = info.head_part_num;
2009 if (hpn < 0) {
2010 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2011 << " no head, returning empty partinfo r="
2012 << r << " tid=" << tid << dendl;
2013 if (f)
2014 f(0, {});
2015 complete(std::move(p), r);
2016 return;
2017 }
2018 headerread = true;
2019 auto op = rgw::cls::fifo::get_part_info(fifo->cct, &header, tid);
2020 std::unique_lock l(fifo->m);
2021 auto oid = fifo->info.part_oid(hpn);
2022 l.unlock();
2023 r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op,
2024 nullptr);
2025 ceph_assert(r >= 0);
2026 return;
2027 }
2028
2029 if (r < 0) {
2030 lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
2031 << " get_part_info failed: r="
2032 << r << " tid=" << tid << dendl;
2033 }
2034
2035 if (f)
2036 f(r, std::move(header));
2037 complete(std::move(p), r);
2038 return;
2039 }
2040};
2041
2042void FIFO::get_head_info(fu2::unique_function<void(int r,
2043 fifo::part_header&&)> f,
2044 lr::AioCompletion* c)
2045{
2046 std::unique_lock l(m);
2047 auto tid = ++next_tid;
2048 l.unlock();
2049 auto ig = std::make_unique<InfoGetter>(this, std::move(f), tid, c);
2050 read_meta(tid, InfoGetter::call(std::move(ig)));
2051}
2052
2053struct JournalProcessor : public Completion<JournalProcessor> {
2054private:
2055 FIFO* const fifo;
2056
2057 std::vector<fifo::journal_entry> processed;
2058 std::multimap<std::int64_t, fifo::journal_entry> journal;
2059 std::multimap<std::int64_t, fifo::journal_entry>::iterator iter;
2060 std::int64_t new_tail;
2061 std::int64_t new_head;
2062 std::int64_t new_max;
2063 int race_retries = 0;
2064 bool first_pp = true;
2065 bool canceled = false;
2066 std::uint64_t tid;
2067
2068 enum {
2069 entry_callback,
2070 pp_callback,
2071 } state;
2072
2073 void create_part(Ptr&& p, int64_t part_num,
2074 std::string_view tag) {
2075 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2076 << " entering: tid=" << tid << dendl;
2077 state = entry_callback;
2078 lr::ObjectWriteOperation op;
2079 op.create(false); /* We don't need exclusivity, part_init ensures
2080 we're creating from the same journal entry. */
2081 std::unique_lock l(fifo->m);
2082 part_init(&op, tag, fifo->info.params);
2083 auto oid = fifo->info.part_oid(part_num);
2084 l.unlock();
2085 auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op);
2086 ceph_assert(r >= 0);
2087 return;
2088 }
2089
2090 void remove_part(Ptr&& p, int64_t part_num,
2091 std::string_view tag) {
2092 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2093 << " entering: tid=" << tid << dendl;
2094 state = entry_callback;
2095 lr::ObjectWriteOperation op;
2096 op.remove();
2097 std::unique_lock l(fifo->m);
2098 auto oid = fifo->info.part_oid(part_num);
2099 l.unlock();
2100 auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op);
2101 ceph_assert(r >= 0);
2102 return;
2103 }
2104
2105 void finish_je(Ptr&& p, int r,
2106 const fifo::journal_entry& entry) {
2107 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2108 << " entering: tid=" << tid << dendl;
2109
2110 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2111 << " finishing entry: entry=" << entry
2112 << " tid=" << tid << dendl;
2113
2114 if (entry.op == fifo::journal_entry::Op::remove && r == -ENOENT)
2115 r = 0;
2116
2117 if (r < 0) {
2118 lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
2119 << " processing entry failed: entry=" << entry
2120 << " r=" << r << " tid=" << tid << dendl;
2121 complete(std::move(p), r);
2122 return;
2123 } else {
2124 switch (entry.op) {
2125 case fifo::journal_entry::Op::unknown:
2126 case fifo::journal_entry::Op::set_head:
2127 // Can't happen. Filtered out in process.
2128 complete(std::move(p), -EIO);
2129 return;
2130
2131 case fifo::journal_entry::Op::create:
2132 if (entry.part_num > new_max) {
2133 new_max = entry.part_num;
2134 }
2135 break;
2136 case fifo::journal_entry::Op::remove:
2137 if (entry.part_num >= new_tail) {
2138 new_tail = entry.part_num + 1;
2139 }
2140 break;
2141 }
2142 processed.push_back(entry);
2143 }
2144 ++iter;
2145 process(std::move(p));
2146 }
2147
2148 void postprocess(Ptr&& p) {
2149 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2150 << " entering: tid=" << tid << dendl;
2151 if (processed.empty()) {
2152 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2153 << " nothing to update any more: race_retries="
2154 << race_retries << " tid=" << tid << dendl;
2155 complete(std::move(p), 0);
2156 return;
2157 }
2158 pp_run(std::move(p), 0, false);
2159 }
2160
2161public:
2162
2163 JournalProcessor(FIFO* fifo, std::uint64_t tid, lr::AioCompletion* super)
2164 : Completion(super), fifo(fifo), tid(tid) {
2165 std::unique_lock l(fifo->m);
2166 journal = fifo->info.journal;
2167 iter = journal.begin();
2168 new_tail = fifo->info.tail_part_num;
2169 new_head = fifo->info.head_part_num;
2170 new_max = fifo->info.max_push_part_num;
2171 }
2172
2173 void pp_run(Ptr&& p, int r, bool canceled) {
2174 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2175 << " entering: tid=" << tid << dendl;
2176 std::optional<int64_t> tail_part_num;
2177 std::optional<int64_t> head_part_num;
2178 std::optional<int64_t> max_part_num;
2179
2180 if (r < 0) {
2181 lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
2182 << " failed, r=: " << r << " tid=" << tid << dendl;
2183 complete(std::move(p), r);
2184 }
2185
2186
2187 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2188 << " postprocessing: race_retries="
2189 << race_retries << " tid=" << tid << dendl;
2190
2191 if (!first_pp && r == 0 && !canceled) {
2192 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2193 << " nothing to update any more: race_retries="
2194 << race_retries << " tid=" << tid << dendl;
2195 complete(std::move(p), 0);
2196 return;
2197 }
2198
2199 first_pp = false;
2200
2201 if (canceled) {
2202 if (race_retries >= MAX_RACE_RETRIES) {
2203 lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
2204 << " canceled too many times, giving up: tid="
2205 << tid << dendl;
2206 complete(std::move(p), -ECANCELED);
2207 return;
2208 }
2209 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2210 << " update canceled, retrying: race_retries="
2211 << race_retries << " tid=" << tid << dendl;
2212
2213 ++race_retries;
2214
2215 std::vector<fifo::journal_entry> new_processed;
2216 std::unique_lock l(fifo->m);
2217 for (auto& e : processed) {
2218 auto jiter = fifo->info.journal.find(e.part_num);
2219 /* journal entry was already processed */
2220 if (jiter == fifo->info.journal.end() ||
2221 !(jiter->second == e)) {
2222 continue;
2223 }
2224 new_processed.push_back(e);
2225 }
2226 processed = std::move(new_processed);
2227 }
2228
2229 std::unique_lock l(fifo->m);
2230 auto objv = fifo->info.version;
2231 if (new_tail > fifo->info.tail_part_num) {
2232 tail_part_num = new_tail;
2233 }
2234
2235 if (new_head > fifo->info.head_part_num) {
2236 head_part_num = new_head;
2237 }
2238
2239 if (new_max > fifo->info.max_push_part_num) {
2240 max_part_num = new_max;
2241 }
2242 l.unlock();
2243
2244 if (processed.empty() &&
2245 !tail_part_num &&
2246 !max_part_num) {
2247 /* nothing to update anymore */
2248 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2249 << " nothing to update any more: race_retries="
2250 << race_retries << " tid=" << tid << dendl;
2251 complete(std::move(p), 0);
2252 return;
2253 }
2254 state = pp_callback;
2255 fifo->_update_meta(fifo::update{}
2256 .tail_part_num(tail_part_num)
2257 .head_part_num(head_part_num)
2258 .max_push_part_num(max_part_num)
2259 .journal_entries_rm(processed),
2260 objv, &this->canceled, tid, call(std::move(p)));
2261 return;
2262 }
2263
2264 JournalProcessor(const JournalProcessor&) = delete;
2265 JournalProcessor& operator =(const JournalProcessor&) = delete;
2266 JournalProcessor(JournalProcessor&&) = delete;
2267 JournalProcessor& operator =(JournalProcessor&&) = delete;
2268
2269 void process(Ptr&& p) {
2270 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2271 << " entering: tid=" << tid << dendl;
2272 while (iter != journal.end()) {
2273 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2274 << " processing entry: entry=" << *iter
2275 << " tid=" << tid << dendl;
2276 const auto entry = iter->second;
2277 switch (entry.op) {
2278 case fifo::journal_entry::Op::create:
2279 create_part(std::move(p), entry.part_num, entry.part_tag);
2280 return;
2281 case fifo::journal_entry::Op::set_head:
2282 if (entry.part_num > new_head) {
2283 new_head = entry.part_num;
2284 }
2285 processed.push_back(entry);
2286 ++iter;
2287 continue;
2288 case fifo::journal_entry::Op::remove:
2289 remove_part(std::move(p), entry.part_num, entry.part_tag);
2290 return;
2291 default:
2292 lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
2293 << " unknown journaled op: entry=" << entry << " tid="
2294 << tid << dendl;
2295 complete(std::move(p), -EIO);
2296 return;
2297 }
2298 }
2299 postprocess(std::move(p));
2300 return;
2301 }
2302
2303 void handle(Ptr&& p, int r) {
2304 ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
2305 << " entering: tid=" << tid << dendl;
2306 switch (state) {
2307 case entry_callback:
2308 finish_je(std::move(p), r, iter->second);
2309 return;
2310 case pp_callback:
2311 auto c = canceled;
2312 canceled = false;
2313 pp_run(std::move(p), r, c);
2314 return;
2315 }
2316
2317 abort();
2318 }
2319
2320};
2321
2322void FIFO::process_journal(std::uint64_t tid, lr::AioCompletion* c) {
2323 auto p = std::make_unique<JournalProcessor>(this, tid, c);
2324 p->process(std::move(p));
2325}
2326
2327struct Lister : Completion<Lister> {
2328 FIFO* f;
2329 std::vector<list_entry> result;
2330 bool more = false;
2331 std::int64_t part_num;
2332 std::uint64_t ofs;
2333 int max_entries;
2334 int r_out = 0;
2335 std::vector<fifo::part_list_entry> entries;
2336 bool part_more = false;
2337 bool part_full = false;
2338 std::vector<list_entry>* entries_out;
2339 bool* more_out;
2340 std::uint64_t tid;
2341
2342 bool read = false;
2343
2344 void complete(Ptr&& p, int r) {
2345 if (r >= 0) {
2346 if (more_out) *more_out = more;
2347 if (entries_out) *entries_out = std::move(result);
2348 }
2349 Completion::complete(std::move(p), r);
2350 }
2351
2352public:
2353 Lister(FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries,
2354 std::vector<list_entry>* entries_out, bool* more_out,
2355 std::uint64_t tid, lr::AioCompletion* super)
2356 : Completion(super), f(f), part_num(part_num), ofs(ofs), max_entries(max_entries),
2357 entries_out(entries_out), more_out(more_out), tid(tid) {
2358 result.reserve(max_entries);
2359 }
2360
2361 Lister(const Lister&) = delete;
2362 Lister& operator =(const Lister&) = delete;
2363 Lister(Lister&&) = delete;
2364 Lister& operator =(Lister&&) = delete;
2365
2366 void handle(Ptr&& p, int r) {
2367 if (read)
2368 handle_read(std::move(p), r);
2369 else
2370 handle_list(std::move(p), r);
2371 }
2372
2373 void list(Ptr&& p) {
2374 if (max_entries > 0) {
2375 part_more = false;
2376 part_full = false;
2377 entries.clear();
2378
2379 std::unique_lock l(f->m);
2380 auto part_oid = f->info.part_oid(part_num);
2381 l.unlock();
2382
2383 read = false;
2384 auto op = list_part(f->cct, {}, ofs, max_entries, &r_out,
2385 &entries, &part_more, &part_full,
2386 nullptr, tid);
2387 f->ioctx.aio_operate(part_oid, call(std::move(p)), &op, nullptr);
2388 } else {
2389 complete(std::move(p), 0);
2390 }
2391 }
2392
2393 void handle_read(Ptr&& p, int r) {
2394 read = false;
2395 if (r >= 0) r = r_out;
2396 r_out = 0;
2397
2398 if (r < 0) {
2399 complete(std::move(p), r);
2400 return;
2401 }
2402
2403 if (part_num < f->info.tail_part_num) {
2404 /* raced with trim? restart */
2405 max_entries += result.size();
2406 result.clear();
2407 part_num = f->info.tail_part_num;
2408 ofs = 0;
2409 list(std::move(p));
2410 return;
2411 }
2412 /* assuming part was not written yet, so end of data */
2413 more = false;
2414 complete(std::move(p), 0);
2415 return;
2416 }
2417
2418 void handle_list(Ptr&& p, int r) {
2419 if (r >= 0) r = r_out;
2420 r_out = 0;
2421 std::unique_lock l(f->m);
2422 auto part_oid = f->info.part_oid(part_num);
2423 l.unlock();
2424 if (r == -ENOENT) {
2425 read = true;
2426 f->read_meta(tid, call(std::move(p)));
2427 return;
2428 }
2429 if (r < 0) {
2430 complete(std::move(p), r);
2431 return;
2432 }
2433
2434 more = part_full || part_more;
2435 for (auto& entry : entries) {
2436 list_entry e;
2437 e.data = std::move(entry.data);
2438 e.marker = marker{part_num, entry.ofs}.to_string();
2439 e.mtime = entry.mtime;
2440 result.push_back(std::move(e));
2441 }
2442 max_entries -= entries.size();
2443 entries.clear();
2444 if (max_entries > 0 && part_more) {
2445 list(std::move(p));
2446 return;
2447 }
2448
2449 if (!part_full) { /* head part is not full */
2450 complete(std::move(p), 0);
2451 return;
2452 }
2453 ++part_num;
2454 ofs = 0;
2455 list(std::move(p));
2456 }
2457};
2458
2459void FIFO::list(int max_entries,
2460 std::optional<std::string_view> markstr,
2461 std::vector<list_entry>* out,
2462 bool* more,
2463 lr::AioCompletion* c) {
2464 std::unique_lock l(m);
2465 auto tid = ++next_tid;
2466 std::int64_t part_num = info.tail_part_num;
2467 l.unlock();
2468 std::uint64_t ofs = 0;
2469 std::optional<::rgw::cls::fifo::marker> marker;
2470
2471 if (markstr) {
2472 marker = to_marker(*markstr);
2473 if (marker) {
2474 part_num = marker->num;
2475 ofs = marker->ofs;
2476 }
2477 }
2478
2479 auto ls = std::make_unique<Lister>(this, part_num, ofs, max_entries, out,
2480 more, tid, c);
2481 if (markstr && !marker) {
2482 auto l = ls.get();
2483 l->complete(std::move(ls), -EINVAL);
2484 } else {
2485 ls->list(std::move(ls));
2486 }
2487}
2488}