]> git.proxmox.com Git - ceph.git/blame - ceph/src/neorados/cls/fifo.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / neorados / cls / fifo.h
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2020 Red Hat <contact@redhat.com>
7 * Author: Adam C. Emerson
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16#ifndef CEPH_NEORADOS_CLS_FIFIO_H
17#define CEPH_NEORADOS_CLS_FIFIO_H
18
19#include <cstdint>
20#include <deque>
21#include <map>
22#include <memory>
23#include <mutex>
24#include <optional>
25#include <string_view>
26#include <vector>
27
28#include <boost/asio.hpp>
29#include <boost/system/error_code.hpp>
30
31#undef FMT_HEADER_ONLY
32#define FMT_HEADER_ONLY 1
33#include <fmt/format.h>
34
35#include "include/neorados/RADOS.hpp"
36#include "include/buffer.h"
37
38#include "common/allocate_unique.h"
39#include "common/async/bind_handler.h"
40#include "common/async/bind_like.h"
41#include "common/async/completion.h"
42#include "common/async/forward_handler.h"
43
44#include "common/dout.h"
45
46#include "cls/fifo/cls_fifo_types.h"
47#include "cls/fifo/cls_fifo_ops.h"
48
49namespace neorados::cls::fifo {
50namespace ba = boost::asio;
51namespace bs = boost::system;
52namespace ca = ceph::async;
53namespace cb = ceph::buffer;
54namespace fifo = rados::cls::fifo;
55
56inline constexpr auto dout_subsys = ceph_subsys_rados;
57inline constexpr std::uint64_t default_max_part_size = 4 * 1024 * 1024;
58inline constexpr std::uint64_t default_max_entry_size = 32 * 1024;
59inline constexpr auto MAX_RACE_RETRIES = 10;
60
61
62const boost::system::error_category& error_category() noexcept;
63
64enum class errc {
65 raced = 1,
66 inconsistency,
67 entry_too_large,
68 invalid_marker,
69 update_failed
70};
71}
72
73namespace boost::system {
74template<>
75struct is_error_code_enum<::neorados::cls::fifo::errc> {
76 static const bool value = true;
77};
78template<>
79struct is_error_condition_enum<::neorados::cls::fifo::errc> {
80 static const bool value = false;
81};
82}
83
84namespace neorados::cls::fifo {
85// explicit conversion:
86inline bs::error_code make_error_code(errc e) noexcept {
87 return { static_cast<int>(e), error_category() };
88}
89
90inline bs::error_code make_error_category(errc e) noexcept {
91 return { static_cast<int>(e), error_category() };
92}
93
94void create_meta(WriteOp& op, std::string_view id,
95 std::optional<fifo::objv> objv,
96 std::optional<std::string_view> oid_prefix,
97 bool exclusive = false,
98 std::uint64_t max_part_size = default_max_part_size,
99 std::uint64_t max_entry_size = default_max_entry_size);
100void get_meta(ReadOp& op, std::optional<fifo::objv> objv,
101 bs::error_code* ec_out, fifo::info* info,
102 std::uint32_t* part_header_size,
103 std::uint32_t* part_entry_overhead);
104
105void update_meta(WriteOp& op, const fifo::objv& objv,
106 const fifo::update& desc);
107
108void part_init(WriteOp& op, std::string_view tag,
109 fifo::data_params params);
110
111void push_part(WriteOp& op, std::string_view tag,
112 std::deque<cb::list> data_bufs,
113 fu2::unique_function<void(bs::error_code, int)>);
114void trim_part(WriteOp& op, std::optional<std::string_view> tag,
115 std::uint64_t ofs,
116 bool exclusive);
117void list_part(ReadOp& op,
118 std::optional<std::string_view> tag,
119 std::uint64_t ofs,
120 std::uint64_t max_entries,
121 bs::error_code* ec_out,
122 std::vector<fifo::part_list_entry>* entries,
123 bool* more,
124 bool* full_part,
125 std::string* ptag);
126void get_part_info(ReadOp& op,
127 bs::error_code* out_ec,
128 fifo::part_header* header);
129
130struct marker {
131 std::int64_t num = 0;
132 std::uint64_t ofs = 0;
133
134 marker() = default;
135 marker(std::int64_t num, std::uint64_t ofs) : num(num), ofs(ofs) {}
136 static marker max() {
137 return { std::numeric_limits<decltype(num)>::max(),
138 std::numeric_limits<decltype(ofs)>::max() };
139 }
140
141 std::string to_string() {
142 return fmt::format("{:0>20}:{:0>20}", num, ofs);
143 }
144};
145
146struct list_entry {
147 cb::list data;
148 std::string marker;
149 ceph::real_time mtime;
150};
151
152using part_info = fifo::part_header;
153
154namespace detail {
155template<typename Handler>
156class JournalProcessor;
157}
158
159/// Completions, Handlers, and CompletionTokens
160/// ===========================================
161///
162/// This class is based on Boost.Asio. For information, see
163/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio.html
164///
165/// As summary, Asio's design is that of functions taking completion
166/// handlers. Every handler has a signature, like
167/// (boost::system::error_code, std::string). The completion handler
168/// receives the result of the function, and the signature is the type
169/// of that result.
170///
171/// The completion handler is specified with a CompletionToken. The
172/// CompletionToken is any type that has a specialization of
173/// async_complete and async_result. See
174/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/async_completion.html
175/// and https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/async_result.html
176///
177/// The return type of a function taking a CompletionToken is
178/// async_result<CompletionToken, Signature>::return_type.
179///
180/// Functions
181/// ---------
182///
183/// The default implementations treat whatever value is described as a
184/// function, whose parameters correspond to the signature, and calls
185/// it upon completion.
186///
187/// EXAMPLE:
188/// Let f be an asynchronous function whose signature is (bs::error_code, int)
189/// Let g be an asynchronous function whose signature is
190/// (bs::error_code, int, std::string).
191///
192///
193/// f([](bs::error_code ec, int i) { ... });
194/// g([](bs::error_code ec, int i, std::string s) { ... });
195///
196/// Will schedule asynchronous tasks, and the provided lambdas will be
197/// called on completion. In this case, f and g return void.
198///
199/// There are other specializations. Commonly used ones are.
200///
201/// Futures
202/// -------
203///
204/// A CompletionToken of boost::asio::use_future will complete with a
205/// promise whose type matches (minus any initial error_code) the
206/// function's signature. The corresponding future is returned. If the
207/// error_code of the result is non-zero, the future is set with an
208/// exception of type boost::asio::system_error.
209///
210/// See https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/use_future_t.html
211///
212/// EXAMPLE:
213///
214/// std::future<int> = f(ba::use_future);
215/// std::future<std::tuple<int, std::string> = g(ba::use_future).
216///
217/// Coroutines
218/// ----------
219///
220/// A CompletionToken of type spawn::yield_context suspends execution
221/// of the current coroutine until completion of the operation. See
222/// src/spawn/README.md
223/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/spawn.html and
224/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/yield_context.html
225///
226/// Operations given this CompletionToken return their results, modulo
227/// any leading error_code. A non-zero error code will be thrown, by
228/// default, but may be bound to a variable instead with the overload
229/// of the array-subscript oeprator.
230///
231/// EXAMPLE:
232/// // Within a function with a yield_context parameter named y
233///
234/// try {
235/// int i = f(y);
236/// } catch (const bs::system_error& ec) { ... }
237///
238/// bs::error_code ec;
239/// auto [i, s] = g(y[ec]);
240///
241/// Blocking calls
242/// --------------
243///
244/// ceph::async::use_blocked, defined in src/common/async/blocked_completion.h
245/// Suspends the current thread of execution, returning the results of
246/// the operation on resumption. Its calling convention is analogous to
247/// that of yield_context.
248///
249/// EXAMPLE:
250/// try {
251/// int i = f(ca::use_blocked);
252/// } catch (const bs::system_error& e) { ... }
253///
254/// bs::error_code ec;
255/// auto [i, s] = g(ca::use_blocked[ec]);
256///
257/// librados Completions
258/// --------------------
259///
260/// If src/common/async/librados_completion.h is included in the
261/// current translation unit, then librados::AioCompletion* may be used
262/// as a CompletionToken. This is only permitted when the completion
263/// signature is either bs::system_error or void. The return type of
264/// functions provided a CompletionToken of AioCompletion* is void. If
265/// the signature includes an error code and the error code is set,
266/// then the error is translated to an int which is set as the result
267/// of the AioCompletion.
268///
269/// EXAMPLE:
270/// // Assume an asynchronous function h whose signature is bs::error_code.
271///
272/// AioCompletion* c = Rados::aio_create_completion();
273/// h(c);
274/// int r = c.get_return_value();
275///
276/// See also src/test/cls_fifo/bench_cls_fifo.cc for a full, simple
277/// example of a program using this class with coroutines.
278///
279///
280/// Markers
281/// =======
282///
283/// Markers represent a position within the FIFO. Internally, they are
284/// part/offset pairs. Externally, they are ordered but otherwise
285/// opaque strings. Markers that compare lower denote positions closer
286/// to the tail.
287///
288/// A marker is returned with every entry from a list() operation. They
289/// may be supplied to a list operation to resume from a given
290/// position, and must be supplied to trim give the position to which
291/// to trim.
292
293class FIFO {
294public:
295
296 FIFO(const FIFO&) = delete;
297 FIFO& operator =(const FIFO&) = delete;
298 FIFO(FIFO&&) = delete;
299 FIFO& operator =(FIFO&&) = delete;
300
301 /// Open an existing FIFO.
302 /// Signature: (bs::error_code ec, std::unique_ptr<FIFO> f)
303 template<typename CT>
304 static auto open(RADOS& r, //< RADOS handle
305 const IOContext& ioc, //< Context for pool, namespace, etc.
306 Object oid, //< OID for the 'main' object of the FIFO
307 CT&& ct, //< CompletionToken
308 /// Fail if is not this version
309 std::optional<fifo::objv> objv = std::nullopt,
310 /// Default executor. By default use the one
311 /// associated with the RADOS handle.
312 std::optional<ba::executor> executor = std::nullopt) {
313 ba::async_completion<CT, void(bs::error_code,
314 std::unique_ptr<FIFO>)> init(ct);
315 auto e = ba::get_associated_executor(init.completion_handler,
316 executor.value_or(r.get_executor()));
317 auto a = ba::get_associated_allocator(init.completion_handler);
318 _read_meta_(
319 &r, oid, ioc, objv,
320 ca::bind_ea(
321 e, a,
322 [&r, ioc, oid, executor, handler = std::move(init.completion_handler)]
323 (bs::error_code ec, fifo::info info,
324 std::uint32_t size, std::uint32_t over) mutable {
325 std::unique_ptr<FIFO> f(
326 new FIFO(r, ioc, oid, executor.value_or(r.get_executor())));
327 f->info = info;
328 f->part_header_size = size;
329 f->part_entry_overhead = over;
330 // If there are journal entries, process them, in case
331 // someone crashed mid-transaction.
332 if (!ec && !info.journal.empty()) {
333 auto e = ba::get_associated_executor(handler, f->get_executor());
334 auto a = ba::get_associated_allocator(handler);
335 auto g = f.get();
336 g->_process_journal(
337 ca::bind_ea(
338 e, a,
339 [f = std::move(f),
340 handler = std::move(handler)](bs::error_code ec) mutable {
341 std::move(handler)(ec, std::move(f));
342 }));
343 return;
344 }
345 std::move(handler)(ec, std::move(f));
346 return;
347 }));
348 return init.result.get();
349 }
350
351 /// Open an existing or create a new FIFO.
352 /// Signature: (bs::error_code ec, std::unique_ptr<FIFO> f)
353 template<typename CT>
354 static auto create(RADOS& r, /// RADOS handle
355 const IOContext& ioc, /// Context for pool, namespace, etc.
356 Object oid, /// OID for the 'main' object of the FIFO
357 CT&& ct, /// CompletionToken
358 /// Fail if FIFO exists and is not this version
359 std::optional<fifo::objv> objv = std::nullopt,
360 /// Custom prefix for parts
361 std::optional<std::string_view> oid_prefix = std::nullopt,
362 /// Fail if FIFO already exists
363 bool exclusive = false,
364 /// Size at which a part is considered full
365 std::uint64_t max_part_size = default_max_part_size,
366 /// Maximum size of any entry
367 std::uint64_t max_entry_size = default_max_entry_size,
368 /// Default executor. By default use the one
369 /// associated with the RADOS handle.
370 std::optional<ba::executor> executor = std::nullopt) {
371 ba::async_completion<CT, void(bs::error_code,
372 std::unique_ptr<FIFO>)> init(ct);
373 WriteOp op;
374 create_meta(op, oid, objv, oid_prefix, exclusive, max_part_size,
375 max_entry_size);
376 auto e = ba::get_associated_executor(init.completion_handler,
377 executor.value_or(r.get_executor()));
378 auto a = ba::get_associated_allocator(init.completion_handler);
379 r.execute(
380 oid, ioc, std::move(op),
381 ca::bind_ea(
382 e, a,
383 [objv, &r, ioc, oid, executor, handler = std::move(init.completion_handler)]
384 (bs::error_code ec) mutable {
385 if (ec) {
386 std::move(handler)(ec, nullptr);
387 return;
388 }
389 auto e = ba::get_associated_executor(
390 handler, executor.value_or(r.get_executor()));
391 auto a = ba::get_associated_allocator(handler);
392 FIFO::_read_meta_(
393 &r, oid, ioc, objv,
394 ca::bind_ea(
395 e, a,
396 [&r, ioc, executor, oid, handler = std::move(handler)]
397 (bs::error_code ec, fifo::info info,
398 std::uint32_t size, std::uint32_t over) mutable {
399 std::unique_ptr<FIFO> f(
400 new FIFO(r, ioc, oid, executor.value_or(r.get_executor())));
401 f->info = info;
402 f->part_header_size = size;
403 f->part_entry_overhead = over;
404 if (!ec && !info.journal.empty()) {
405 auto e = ba::get_associated_executor(handler,
406 f->get_executor());
407 auto a = ba::get_associated_allocator(handler);
408 auto g = f.get();
409 g->_process_journal(
410 ca::bind_ea(
411 e, a,
412 [f = std::move(f), handler = std::move(handler)]
413 (bs::error_code ec) mutable {
414 std::move(handler)(ec, std::move(f));
415 }));
416 return;
417 }
418 std::move(handler)(ec, std::move(f));
419 }));
420 }));
421 return init.result.get();
422 }
423
424 /// Force a re-read of FIFO metadata.
425 /// Signature: (bs::error_code ec)
426 template<typename CT>
427 auto read_meta(CT&& ct, //< CompletionToken
428 /// Fail if FIFO not at this version
429 std::optional<fifo::objv> objv = std::nullopt) {
430 std::unique_lock l(m);
431 auto version = info.version;
432 l.unlock();
433 ba::async_completion<CT, void(bs::error_code)> init(ct);
434 auto e = ba::get_associated_executor(init.completion_handler,
435 get_executor());
436 auto a = ba::get_associated_allocator(init.completion_handler);
437 _read_meta_(
438 r, oid, ioc, objv,
439 ca::bind_ea(
440 e, a,
441 [this, version, handler = std::move(init.completion_handler)]
442 (bs::error_code ec, fifo::info newinfo,
443 std::uint32_t size, std::uint32_t over) mutable {
444 std::unique_lock l(m);
445 if (version == info.version) {
446 info = newinfo;
447 part_header_size = size;
448 part_entry_overhead = over;
449 }
450 l.unlock();
451 return std::move(handler)(ec);
452 }));
453 return init.result.get();
454 }
455
456 /// Return a reference to currently known metadata
457 const fifo::info& meta() const {
458 return info;
459 }
460
461 /// Return header size and entry overhead of partitions.
462 std::pair<std::uint32_t, std::uint32_t> get_part_layout_info() {
463 return {part_header_size, part_entry_overhead};
464 }
465
466 /// Push a single entry to the FIFO.
467 /// Signature: (bs::error_code)
468 template<typename CT>
469 auto push(const cb::list& bl, //< Bufferlist holding entry to push
470 CT&& ct //< CompletionToken
471 ) {
472 return push(std::vector{ bl }, std::forward<CT>(ct));
473 }
474
475 /// Push a many entries to the FIFO.
476 /// Signature: (bs::error_code)
477 template<typename CT>
478 auto push(const std::vector<cb::list>& data_bufs, //< Entries to push
479 CT&& ct //< CompletionToken
480 ) {
481 ba::async_completion<CT, void(bs::error_code)> init(ct);
482 std::unique_lock l(m);
483 auto max_entry_size = info.params.max_entry_size;
484 auto need_new_head = info.need_new_head();
485 l.unlock();
486 auto e = ba::get_associated_executor(init.completion_handler,
487 get_executor());
488 auto a = ba::get_associated_allocator(init.completion_handler);
489 if (data_bufs.empty() ) {
490 // Can't fail if you don't try.
491 e.post(ca::bind_handler(std::move(init.completion_handler),
492 bs::error_code{}), a);
493 return init.result.get();
494 }
495
496 // Validate sizes
497 for (const auto& bl : data_bufs) {
498 if (bl.length() > max_entry_size) {
499 ldout(r->cct(), 10) << __func__ << "(): entry too large: "
500 << bl.length() << " > "
501 << info.params.max_entry_size << dendl;
502 e.post(ca::bind_handler(std::move(init.completion_handler),
503 errc::entry_too_large), a);
504 return init.result.get();
505 }
506 }
507
508 auto p = ca::bind_ea(e, a,
509 Pusher(this, {data_bufs.begin(), data_bufs.end()},
510 {}, 0, std::move(init.completion_handler)));
511
512 if (need_new_head) {
513 _prepare_new_head(std::move(p));
514 } else {
515 e.dispatch(std::move(p), a);
516 }
517 return init.result.get();
518 }
519
520 /// List the entries in a FIFO
521 /// Signature(bs::error_code ec, bs::vector<list_entry> entries, bool more)
522 ///
523 /// More is true if entries beyond the last exist.
524 /// The list entries are of the form:
525 /// data - Contents of the entry
526 /// marker - String representing the position of this entry within the FIFO.
527 /// mtime - Time (on the OSD) at which the entry was pushed.
528 template<typename CT>
529 auto list(int max_entries, //< Maximum number of entries to fetch
530 /// Optionally, a marker indicating the position after
531 /// which to begin listing. If null, begin at the tail.
532 std::optional<std::string_view> markstr,
533 CT&& ct //< CompletionToken
534 ) {
535 ba::async_completion<CT, void(bs::error_code,
536 std::vector<list_entry>, bool)> init(ct);
537 std::unique_lock l(m);
538 std::int64_t part_num = info.tail_part_num;
539 l.unlock();
540 std::uint64_t ofs = 0;
541 auto a = ba::get_associated_allocator(init.completion_handler);
542 auto e = ba::get_associated_executor(init.completion_handler);
543
544 if (markstr) {
545 auto marker = to_marker(*markstr);
546 if (!marker) {
547 ldout(r->cct(), 0) << __func__
548 << "(): failed to parse marker (" << *markstr
549 << ")" << dendl;
550 e.post(ca::bind_handler(std::move(init.completion_handler),
551 errc::invalid_marker,
552 std::vector<list_entry>{}, false), a);
553 return init.result.get();
554 }
555 part_num = marker->num;
556 ofs = marker->ofs;
557 }
558
559 using handler_type = decltype(init.completion_handler);
560 auto ls = ceph::allocate_unique<Lister<handler_type>>(
561 a, this, part_num, ofs, max_entries,
562 std::move(init.completion_handler));
563 ls.release()->list();
564 return init.result.get();
565 }
566
567 /// Trim entries from the tail to the given position
568 /// Signature: (bs::error_code)
569 template<typename CT>
570 auto trim(std::string_view markstr, //< Position to which to trim, inclusive
571 bool exclusive, //< If true, trim markers up to but NOT INCLUDING
572 //< markstr, otherwise trim markstr as well.
573 CT&& ct //< CompletionToken
574 ) {
575 auto m = to_marker(markstr);
576 ba::async_completion<CT, void(bs::error_code)> init(ct);
577 auto a = ba::get_associated_allocator(init.completion_handler);
578 auto e = ba::get_associated_executor(init.completion_handler);
579 if (!m) {
580 ldout(r->cct(), 0) << __func__ << "(): failed to parse marker: marker="
581 << markstr << dendl;
582 e.post(ca::bind_handler(std::move(init.completion_handler),
583 errc::invalid_marker), a);
584 return init.result.get();
585 } else {
586 using handler_type = decltype(init.completion_handler);
587 auto t = ceph::allocate_unique<Trimmer<handler_type>>(
588 a, this, m->num, m->ofs, exclusive, std::move(init.completion_handler));
589 t.release()->trim();
590 }
591 return init.result.get();
592 }
593
594 /// Get information about a specific partition
595 /// Signature: (bs::error_code, part_info)
596 ///
597 /// part_info has the following entries
598 /// tag - A random string identifying this partition. Used internally
599 /// as a sanity check to make sure operations haven't been misdirected
600 /// params - Data parameters, identical for every partition within a
601 /// FIFO and the same as what is returned from get_part_layout()
602 /// magic - A random magic number, used internally as a prefix to
603 /// every entry stored on the OSD to ensure sync
604 /// min_ofs - Offset of the first entry
605 /// max_ofs - Offset of the highest entry
606 /// min_index - Minimum entry index
607 /// max_index - Maximum entry index
608 /// max_time - Time of the latest push
609 ///
610 /// The difference between ofs and index is that ofs is a byte
611 /// offset. Index is a count. Nothing really uses indices, but
612 /// they're tracked and sanity-checked as an invariant on the OSD.
613 ///
614 /// max_ofs and max_time are the two that have been used externally
615 /// so far.
616 template<typename CT>
617 auto get_part_info(int64_t part_num, // The number of the partition
618 CT&& ct // CompletionToken
619 ) {
620
621 ba::async_completion<CT, void(bs::error_code, part_info)> init(ct);
622 fifo::op::get_part_info gpi;
623 cb::list in;
624 encode(gpi, in);
625 ReadOp op;
626 auto e = ba::get_associated_executor(init.completion_handler,
627 get_executor());
628 auto a = ba::get_associated_allocator(init.completion_handler);
629 auto reply = ceph::allocate_unique<
630 ExecDecodeCB<fifo::op::get_part_info_reply>>(a);
631
632 op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in,
633 std::ref(*reply));
634 std::unique_lock l(m);
635 auto part_oid = info.part_oid(part_num);
636 l.unlock();
637 r->execute(part_oid, ioc, std::move(op), nullptr,
638 ca::bind_ea(e, a,
639 PartInfoGetter(std::move(init.completion_handler),
640 std::move(reply))));
641 return init.result.get();
642 }
643
644 using executor_type = ba::executor;
645
646 /// Return the default executor, as specified at creation.
647 ba::executor get_executor() const {
648 return executor;
649 }
650
651private:
652 template<typename Handler>
653 friend class detail::JournalProcessor;
654 RADOS* const r;
655 const IOContext ioc;
656 const Object oid;
657 std::mutex m;
658
659 fifo::info info;
660
661 std::uint32_t part_header_size = 0xdeadbeef;
662 std::uint32_t part_entry_overhead = 0xdeadbeef;
663
664 ba::executor executor;
665
666 std::optional<marker> to_marker(std::string_view s);
667
668 template<typename Handler, typename T>
669 static void assoc_delete(const Handler& handler, T* t) {
670 typename std::allocator_traits<typename ba::associated_allocator<Handler>::type>
671 ::template rebind_alloc<T> a(
672 ba::get_associated_allocator(handler));
673 a.destroy(t);
674 a.deallocate(t, 1);
675 }
676
677 FIFO(RADOS& r,
678 IOContext ioc,
679 Object oid,
680 ba::executor executor)
681 : r(&r), ioc(std::move(ioc)), oid(oid), executor(executor) {}
682
683 std::string generate_tag() const;
684
685 template <typename T>
686 struct ExecDecodeCB {
687 bs::error_code ec;
688 T result;
689 void operator()(bs::error_code e, const cb::list& r) {
690 if (e) {
691 ec = e;
692 return;
693 }
694 try {
695 auto p = r.begin();
696 using ceph::decode;
697 decode(result, p);
698 } catch (const cb::error& err) {
699 ec = err.code();
700 }
701 }
702 };
703
704 template<typename Handler>
705 class MetaReader {
706 Handler handler;
707 using allocator_type = boost::asio::associated_allocator_t<Handler>;
708 using decoder_type = ExecDecodeCB<fifo::op::get_meta_reply>;
709 using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>;
710 decoder_ptr decoder;
711 public:
712 MetaReader(Handler&& handler, decoder_ptr&& decoder)
713 : handler(std::move(handler)), decoder(std::move(decoder)) {}
714
715 void operator ()(bs::error_code ec) {
716 if (!ec) {
717 ec = decoder->ec;
718 }
719 auto reply = std::move(decoder->result);
720 decoder.reset(); // free handler-allocated memory before dispatching
721
722 std::move(handler)(ec, std::move(reply.info),
723 std::move(reply.part_header_size),
724 std::move(reply.part_entry_overhead));
725 }
726 };
727
728 // Renamed to get around a compiler bug in Bionic that kept
729 // complaining we weren't capturing 'this' to make a static function call.
730 template<typename Handler>
731 static void _read_meta_(RADOS* r, const Object& oid, const IOContext& ioc,
732 std::optional<fifo::objv> objv,
733 Handler&& handler, /* error_code, info, uint64,
734 uint64 */
735 std::optional<ba::executor> executor = std::nullopt){
736 fifo::op::get_meta gm;
737
738 gm.version = objv;
739
740 cb::list in;
741 encode(gm, in);
742 ReadOp op;
743
744 auto a = ba::get_associated_allocator(handler);
745 auto reply =
746 ceph::allocate_unique<ExecDecodeCB<fifo::op::get_meta_reply>>(a);
747
748 auto e = ba::get_associated_executor(handler);
749 op.exec(fifo::op::CLASS, fifo::op::GET_META, in, std::ref(*reply));
750 r->execute(oid, ioc, std::move(op), nullptr,
751 ca::bind_ea(e, a, MetaReader(std::move(handler),
752 std::move(reply))));
753 };
754
755 template<typename Handler>
756 void _read_meta(Handler&& handler /* error_code */) {
757 auto e = ba::get_associated_executor(handler, get_executor());
758 auto a = ba::get_associated_allocator(handler);
759 _read_meta_(r, oid, ioc,
20effc67 760 std::nullopt,
f67539c2
TL
761 ca::bind_ea(
762 e, a,
763 [this,
764 handler = std::move(handler)](bs::error_code ec,
765 fifo::info&& info,
766 std::uint64_t phs,
767 std::uint64_t peo) mutable {
768 std::unique_lock l(m);
769 if (ec) {
770 l.unlock();
771 std::move(handler)(ec);
772 return;
773 }
774 // We have a newer version already!
775 if (!info.version.same_or_later(this->info.version)) {
776 l.unlock();
777 std::move(handler)(bs::error_code{});
778 return;
779 }
780 this->info = std::move(info);
781 part_header_size = phs;
782 part_entry_overhead = peo;
783 l.unlock();
784 std::move(handler)(bs::error_code{});
785 }), get_executor());
786 }
787
788 bs::error_code apply_update(fifo::info* info,
789 const fifo::objv& objv,
790 const fifo::update& update);
791
792
793 template<typename Handler>
794 void _update_meta(const fifo::update& update,
795 fifo::objv version,
796 Handler&& handler /* error_code, bool */) {
797 WriteOp op;
798
799 cls::fifo::update_meta(op, info.version, update);
800
801 auto a = ba::get_associated_allocator(handler);
802 auto e = ba::get_associated_executor(handler, get_executor());
803
804 r->execute(
805 oid, ioc, std::move(op),
806 ca::bind_ea(
807 e, a,
808 [this, e, a, version, update,
809 handler = std::move(handler)](bs::error_code ec) mutable {
810 if (ec && ec != bs::errc::operation_canceled) {
811 std::move(handler)(ec, bool{});
812 return;
813 }
814
815 auto canceled = (ec == bs::errc::operation_canceled);
816
817 if (!canceled) {
818 ec = apply_update(&info,
819 version,
820 update);
821 if (ec) {
822 canceled = true;
823 }
824 }
825
826 if (canceled) {
827 _read_meta(
828 ca::bind_ea(
829 e, a,
830 [handler = std::move(handler)](bs::error_code ec) mutable {
831 std::move(handler)(ec, ec ? false : true);
832 }));
833 return;
834 }
835 std::move(handler)(ec, false);
836 return;
837 }));
838 }
839
840 template<typename Handler>
841 auto _process_journal(Handler&& handler /* error_code */) {
842 auto a = ba::get_associated_allocator(std::ref(handler));
843 auto j = ceph::allocate_unique<detail::JournalProcessor<Handler>>(
844 a, this, std::move(handler));
845 auto p = j.release();
846 p->process();
847 }
848
849 template<typename Handler>
850 class NewPartPreparer {
851 FIFO* f;
852 Handler handler;
853 std::vector<fifo::journal_entry> jentries;
854 int i;
855 std::int64_t new_head_part_num;
856
857 public:
858
859 void operator ()(bs::error_code ec, bool canceled) {
860 if (ec) {
861 std::move(handler)(ec);
862 return;
863 }
864
865 if (canceled) {
866 std::unique_lock l(f->m);
867 auto iter = f->info.journal.find(jentries.front().part_num);
868 auto max_push_part_num = f->info.max_push_part_num;
869 auto head_part_num = f->info.head_part_num;
870 auto version = f->info.version;
871 auto found = (iter != f->info.journal.end());
872 l.unlock();
873 if ((max_push_part_num >= jentries.front().part_num &&
874 head_part_num >= new_head_part_num)) {
875 /* raced, but new part was already written */
876 std::move(handler)(bs::error_code{});
877 return;
878 }
879 if (i >= MAX_RACE_RETRIES) {
880 std::move(handler)(errc::raced);
881 return;
882 }
883 if (!found) {
884 auto e = ba::get_associated_executor(handler, f->get_executor());
885 auto a = ba::get_associated_allocator(handler);
886 f->_update_meta(fifo::update{}
887 .journal_entries_add(jentries),
888 version,
889 ca::bind_ea(
890 e, a,
891 NewPartPreparer(f, std::move(handler),
892 jentries,
893 i + 1, new_head_part_num)));
894 return;
895 }
896 // Fall through. We still need to process the journal.
897 }
898 f->_process_journal(std::move(handler));
899 return;
900 }
901
902 NewPartPreparer(FIFO* f,
903 Handler&& handler,
904 std::vector<fifo::journal_entry> jentries,
905 int i, std::int64_t new_head_part_num)
906 : f(f), handler(std::move(handler)), jentries(std::move(jentries)),
907 i(i), new_head_part_num(new_head_part_num) {}
908 };
909
910 template<typename Handler>
911 void _prepare_new_part(bool is_head,
912 Handler&& handler /* error_code */) {
913 std::unique_lock l(m);
914 std::vector jentries = { info.next_journal_entry(generate_tag()) };
915 std::int64_t new_head_part_num = info.head_part_num;
916 auto version = info.version;
917
918 if (is_head) {
919 auto new_head_jentry = jentries.front();
920 new_head_jentry.op = fifo::journal_entry::Op::set_head;
921 new_head_part_num = jentries.front().part_num;
922 jentries.push_back(std::move(new_head_jentry));
923 }
924 l.unlock();
925
926 auto e = ba::get_associated_executor(handler, get_executor());
927 auto a = ba::get_associated_allocator(handler);
928 _update_meta(fifo::update{}.journal_entries_add(jentries),
929 version,
930 ca::bind_ea(
931 e, a,
932 NewPartPreparer(this, std::move(handler),
933 jentries, 0, new_head_part_num)));
934 }
935
936 template<typename Handler>
937 class NewHeadPreparer {
938 FIFO* f;
939 Handler handler;
940 int i;
941 std::int64_t new_head_num;
942
943 public:
944
945 void operator ()(bs::error_code ec, bool canceled) {
946 std::unique_lock l(f->m);
947 auto head_part_num = f->info.head_part_num;
948 auto version = f->info.version;
949 l.unlock();
950
951 if (ec) {
952 std::move(handler)(ec);
953 return;
954 }
955 if (canceled) {
956 if (i >= MAX_RACE_RETRIES) {
957 std::move(handler)(errc::raced);
958 return;
959 }
960
961 // Raced, but there's still work to do!
962 if (head_part_num < new_head_num) {
963 auto e = ba::get_associated_executor(handler, f->get_executor());
964 auto a = ba::get_associated_allocator(handler);
965 f->_update_meta(fifo::update{}.head_part_num(new_head_num),
966 version,
967 ca::bind_ea(
968 e, a,
969 NewHeadPreparer(f, std::move(handler),
970 i + 1,
971 new_head_num)));
972 return;
973 }
974 }
975 // Either we succeeded, or we were raced by someone who did it for us.
976 std::move(handler)(bs::error_code{});
977 return;
978 }
979
980 NewHeadPreparer(FIFO* f,
981 Handler&& handler,
982 int i, std::int64_t new_head_num)
983 : f(f), handler(std::move(handler)), i(i), new_head_num(new_head_num) {}
984 };
985
986 template<typename Handler>
987 void _prepare_new_head(Handler&& handler /* error_code */) {
988 std::unique_lock l(m);
989 int64_t new_head_num = info.head_part_num + 1;
990 auto max_push_part_num = info.max_push_part_num;
991 auto version = info.version;
992 l.unlock();
993
994 if (max_push_part_num < new_head_num) {
995 auto e = ba::get_associated_executor(handler, get_executor());
996 auto a = ba::get_associated_allocator(handler);
997 _prepare_new_part(
998 true,
999 ca::bind_ea(
1000 e, a,
1001 [this, new_head_num,
1002 handler = std::move(handler)](bs::error_code ec) mutable {
1003 if (ec) {
1004 handler(ec);
1005 return;
1006 }
1007 std::unique_lock l(m);
1008 if (info.max_push_part_num < new_head_num) {
1009 l.unlock();
1010 ldout(r->cct(), 0)
1011 << "ERROR: " << __func__
1012 << ": after new part creation: meta_info.max_push_part_num="
1013 << info.max_push_part_num << " new_head_num="
1014 << info.max_push_part_num << dendl;
1015 std::move(handler)(errc::inconsistency);
1016 } else {
1017 l.unlock();
1018 std::move(handler)(bs::error_code{});
1019 }
1020 }));
1021 return;
1022 }
1023 auto e = ba::get_associated_executor(handler, get_executor());
1024 auto a = ba::get_associated_allocator(handler);
1025 _update_meta(fifo::update{}.head_part_num(new_head_num),
1026 version,
1027 ca::bind_ea(
1028 e, a,
1029 NewHeadPreparer(this, std::move(handler), 0,
1030 new_head_num)));
1031 }
1032
1033 template<typename T>
1034 struct ExecHandleCB {
1035 bs::error_code ec;
1036 T result;
1037 void operator()(bs::error_code e, const T& t) {
1038 if (e) {
1039 ec = e;
1040 return;
1041 }
1042 result = t;
1043 }
1044 };
1045
1046 template<typename Handler>
1047 class EntryPusher {
1048 Handler handler;
1049 using allocator_type = boost::asio::associated_allocator_t<Handler>;
1050 using decoder_type = ExecHandleCB<int>;
1051 using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>;
1052 decoder_ptr decoder;
1053
1054 public:
1055
1056 EntryPusher(Handler&& handler, decoder_ptr&& decoder)
1057 : handler(std::move(handler)), decoder(std::move(decoder)) {}
1058
1059 void operator ()(bs::error_code ec) {
1060 if (!ec) {
1061 ec = decoder->ec;
1062 }
1063 auto reply = std::move(decoder->result);
1064 decoder.reset(); // free handler-allocated memory before dispatching
1065
1066 std::move(handler)(ec, std::move(reply));
1067 }
1068 };
1069
1070 template<typename Handler>
1071 auto push_entries(const std::deque<cb::list>& data_bufs,
1072 Handler&& handler /* error_code, int */) {
1073 WriteOp op;
1074 std::unique_lock l(m);
1075 auto head_part_num = info.head_part_num;
1076 auto tag = info.head_tag;
1077 auto oid = info.part_oid(head_part_num);
1078 l.unlock();
1079
1080 auto a = ba::get_associated_allocator(handler);
1081 auto reply = ceph::allocate_unique<ExecHandleCB<int>>(a);
1082
1083 auto e = ba::get_associated_executor(handler, get_executor());
1084 push_part(op, tag, data_bufs, std::ref(*reply));
1085 return r->execute(oid, ioc, std::move(op),
1086 ca::bind_ea(e, a, EntryPusher(std::move(handler),
1087 std::move(reply))));
1088 }
1089
1090 template<typename CT>
1091 auto trim_part(int64_t part_num,
1092 uint64_t ofs,
1093 std::optional<std::string_view> tag,
1094 bool exclusive,
1095 CT&& ct) {
1096 WriteOp op;
1097 cls::fifo::trim_part(op, tag, ofs, exclusive);
1098 return r->execute(info.part_oid(part_num), ioc, std::move(op),
1099 std::forward<CT>(ct));
1100 }
1101
1102
1103 template<typename Handler>
1104 class Pusher {
1105 FIFO* f;
1106 std::deque<cb::list> remaining;
1107 std::deque<cb::list> batch;
1108 int i;
1109 Handler handler;
1110
1111 void prep_then_push(const unsigned successes) {
1112 std::unique_lock l(f->m);
1113 auto max_part_size = f->info.params.max_part_size;
1114 auto part_entry_overhead = f->part_entry_overhead;
1115 l.unlock();
1116
1117 uint64_t batch_len = 0;
1118 if (successes > 0) {
1119 if (successes == batch.size()) {
1120 batch.clear();
1121 } else {
1122 batch.erase(batch.begin(), batch.begin() + successes);
1123 for (const auto& b : batch) {
1124 batch_len += b.length() + part_entry_overhead;
1125 }
1126 }
1127 }
1128
1129 if (batch.empty() && remaining.empty()) {
1130 std::move(handler)(bs::error_code{});
1131 return;
1132 }
1133
1134 while (!remaining.empty() &&
1135 (remaining.front().length() + batch_len <= max_part_size)) {
1136
1137 /* We can send entries with data_len up to max_entry_size,
1138 however, we want to also account the overhead when
1139 dealing with multiple entries. Previous check doesn't
1140 account for overhead on purpose. */
1141 batch_len += remaining.front().length() + part_entry_overhead;
1142 batch.push_back(std::move(remaining.front()));
1143 remaining.pop_front();
1144 }
1145 push();
1146 }
1147
1148 void push() {
1149 auto e = ba::get_associated_executor(handler, f->get_executor());
1150 auto a = ba::get_associated_allocator(handler);
1151 f->push_entries(batch,
1152 ca::bind_ea(e, a,
1153 Pusher(f, std::move(remaining),
1154 batch, i,
1155 std::move(handler))));
1156 }
1157
1158 public:
1159
1160 // Initial call!
1161 void operator ()() {
1162 prep_then_push(0);
1163 }
1164
1165 // Called with response to push_entries
1166 void operator ()(bs::error_code ec, int r) {
1167 if (ec == bs::errc::result_out_of_range) {
1168 auto e = ba::get_associated_executor(handler, f->get_executor());
1169 auto a = ba::get_associated_allocator(handler);
1170 f->_prepare_new_head(
1171 ca::bind_ea(e, a,
1172 Pusher(f, std::move(remaining),
1173 std::move(batch), i,
1174 std::move(handler))));
1175 return;
1176 }
1177 if (ec) {
1178 std::move(handler)(ec);
1179 return;
1180 }
1181 i = 0; // We've made forward progress, so reset the race counter!
1182 prep_then_push(r);
1183 }
1184
1185 // Called with response to prepare_new_head
1186 void operator ()(bs::error_code ec) {
1187 if (ec == bs::errc::operation_canceled) {
1188 if (i == MAX_RACE_RETRIES) {
1189 ldout(f->r->cct(), 0)
1190 << "ERROR: " << __func__
1191 << "(): race check failed too many times, likely a bug" << dendl;
1192 std::move(handler)(make_error_code(errc::raced));
1193 return;
1194 }
1195 ++i;
1196 } else if (ec) {
1197 std::move(handler)(ec);
1198 return;
1199 }
1200
1201 if (batch.empty()) {
1202 prep_then_push(0);
1203 return;
1204 } else {
1205 push();
1206 return;
1207 }
1208 }
1209
1210 Pusher(FIFO* f, std::deque<cb::list>&& remaining,
1211 std::deque<cb::list> batch, int i,
1212 Handler&& handler)
1213 : f(f), remaining(std::move(remaining)),
1214 batch(std::move(batch)), i(i),
1215 handler(std::move(handler)) {}
1216 };
1217
1218 template<typename Handler>
1219 class Lister {
1220 FIFO* f;
1221 std::vector<list_entry> result;
1222 bool more = false;
1223 std::int64_t part_num;
1224 std::uint64_t ofs;
1225 int max_entries;
1226 bs::error_code ec_out;
1227 std::vector<fifo::part_list_entry> entries;
1228 bool part_more = false;
1229 bool part_full = false;
1230 Handler handler;
1231
1232 void handle(bs::error_code ec) {
1233 auto h = std::move(handler);
1234 auto m = more;
1235 auto r = std::move(result);
1236
1237 FIFO::assoc_delete(h, this);
1238 std::move(h)(ec, std::move(r), m);
1239 }
1240
1241 public:
1242 Lister(FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries,
1243 Handler&& handler)
1244 : f(f), part_num(part_num), ofs(ofs), max_entries(max_entries),
1245 handler(std::move(handler)) {
1246 result.reserve(max_entries);
1247 }
1248
1249
1250 Lister(const Lister&) = delete;
1251 Lister& operator =(const Lister&) = delete;
1252 Lister(Lister&&) = delete;
1253 Lister& operator =(Lister&&) = delete;
1254
1255 void list() {
1256 if (max_entries > 0) {
1257 ReadOp op;
1258 ec_out.clear();
1259 part_more = false;
1260 part_full = false;
1261 entries.clear();
1262
1263 std::unique_lock l(f->m);
1264 auto part_oid = f->info.part_oid(part_num);
1265 l.unlock();
1266
1267 list_part(op,
1268 {},
1269 ofs,
1270 max_entries,
1271 &ec_out,
1272 &entries,
1273 &part_more,
1274 &part_full,
1275 nullptr);
1276 auto e = ba::get_associated_executor(handler, f->get_executor());
1277 auto a = ba::get_associated_allocator(handler);
1278 f->r->execute(
1279 part_oid,
1280 f->ioc,
1281 std::move(op),
1282 nullptr,
1283 ca::bind_ea(
1284 e, a,
1285 [t = std::unique_ptr<Lister>(this), this,
1286 part_oid](bs::error_code ec) mutable {
1287 t.release();
1288 if (ec == bs::errc::no_such_file_or_directory) {
1289 auto e = ba::get_associated_executor(handler,
1290 f->get_executor());
1291 auto a = ba::get_associated_allocator(handler);
1292 f->_read_meta(
1293 ca::bind_ea(
1294 e, a,
1295 [this](bs::error_code ec) mutable {
1296 if (ec) {
1297 handle(ec);
1298 return;
1299 }
1300
1301 if (part_num < f->info.tail_part_num) {
1302 /* raced with trim? restart */
1303 max_entries += result.size();
1304 result.clear();
1305 part_num = f->info.tail_part_num;
1306 ofs = 0;
1307 list();
1308 }
1309 /* assuming part was not written yet, so end of data */
1310 more = false;
1311 handle({});
1312 return;
1313 }));
1314 return;
1315 }
1316 if (ec) {
1317 ldout(f->r->cct(), 0)
1318 << __func__
1319 << "(): list_part() on oid=" << part_oid
1320 << " returned ec=" << ec.message() << dendl;
1321 handle(ec);
1322 return;
1323 }
1324 if (ec_out) {
1325 ldout(f->r->cct(), 0)
1326 << __func__
1327 << "(): list_part() on oid=" << f->info.part_oid(part_num)
1328 << " returned ec=" << ec_out.message() << dendl;
1329 handle(ec_out);
1330 return;
1331 }
1332
1333 more = part_full || part_more;
1334 for (auto& entry : entries) {
1335 list_entry e;
1336 e.data = std::move(entry.data);
1337 e.marker = marker{part_num, entry.ofs}.to_string();
1338 e.mtime = entry.mtime;
1339 result.push_back(std::move(e));
1340 }
1341 max_entries -= entries.size();
1342 entries.clear();
1343 if (max_entries > 0 &&
1344 part_more) {
1345 list();
1346 return;
1347 }
1348
1349 if (!part_full) { /* head part is not full */
1350 handle({});
1351 return;
1352 }
1353 ++part_num;
1354 ofs = 0;
1355 list();
1356 }));
1357 } else {
1358 handle({});
1359 return;
1360 }
1361 }
1362 };
1363
1364 template<typename Handler>
1365 class Trimmer {
1366 FIFO* f;
1367 std::int64_t part_num;
1368 std::uint64_t ofs;
1369 bool exclusive;
1370 Handler handler;
1371 std::int64_t pn;
1372 int i = 0;
1373
1374 void handle(bs::error_code ec) {
1375 auto h = std::move(handler);
1376
1377 FIFO::assoc_delete(h, this);
1378 return std::move(h)(ec);
1379 }
1380
1381 void update() {
1382 std::unique_lock l(f->m);
1383 auto objv = f->info.version;
1384 l.unlock();
1385 auto a = ba::get_associated_allocator(handler);
1386 auto e = ba::get_associated_executor(handler, f->get_executor());
1387 f->_update_meta(
1388 fifo::update{}.tail_part_num(part_num),
1389 objv,
1390 ca::bind_ea(
1391 e, a,
1392 [this, t = std::unique_ptr<Trimmer>(this)](bs::error_code ec,
1393 bool canceled) mutable {
1394 t.release();
1395 if (canceled)
1396 if (i >= MAX_RACE_RETRIES) {
1397 ldout(f->r->cct(), 0)
1398 << "ERROR: " << __func__
1399 << "(): race check failed too many times, likely a bug"
1400 << dendl;
1401 handle(errc::raced);
1402 return;
1403 }
1404 std::unique_lock l(f->m);
1405 auto tail_part_num = f->info.tail_part_num;
1406 l.unlock();
1407 if (tail_part_num < part_num) {
1408 ++i;
1409 update();
1410 return;
1411 }
1412 handle({});
1413 return;
1414 }));
1415 }
1416
1417 public:
1418 Trimmer(FIFO* f, std::int64_t part_num, std::uint64_t ofs,
1419 bool exclusive, Handler&& handler)
1420 : f(f), part_num(part_num), ofs(ofs), exclusive(exclusive),
1421 handler(std::move(handler)) {
1422 std::unique_lock l(f->m);
1423 pn = f->info.tail_part_num;
1424 }
1425
1426 void trim() {
1427 auto a = ba::get_associated_allocator(handler);
1428 auto e = ba::get_associated_executor(handler, f->get_executor());
1429 if (pn < part_num) {
1430 std::unique_lock l(f->m);
1431 auto max_part_size = f->info.params.max_part_size;
1432 l.unlock();
1433 f->trim_part(
1434 pn, max_part_size, std::nullopt,
1435 false,
1436 ca::bind_ea(
1437 e, a,
1438 [t = std::unique_ptr<Trimmer>(this),
1439 this](bs::error_code ec) mutable {
1440 t.release();
1441 if (ec && ec != bs::errc::no_such_file_or_directory) {
1442 ldout(f->r->cct(), 0)
1443 << __func__ << "(): ERROR: trim_part() on part="
1444 << pn << " returned ec=" << ec.message() << dendl;
1445 handle(ec);
1446 return;
1447 }
1448 ++pn;
1449 trim();
1450 }));
1451 return;
1452 }
1453 f->trim_part(
1454 part_num, ofs, std::nullopt, exclusive,
1455 ca::bind_ea(
1456 e, a,
1457 [t = std::unique_ptr<Trimmer>(this),
1458 this](bs::error_code ec) mutable {
1459 t.release();
1460 if (ec && ec != bs::errc::no_such_file_or_directory) {
1461 ldout(f->r->cct(), 0)
1462 << __func__ << "(): ERROR: trim_part() on part=" << part_num
1463 << " returned ec=" << ec.message() << dendl;
1464 handle(ec);
1465 return;
1466 }
1467 std::unique_lock l(f->m);
1468 auto tail_part_num = f->info.tail_part_num;
1469 l.unlock();
1470 if (part_num <= tail_part_num) {
1471 /* don't need to modify meta info */
1472 handle({});
1473 return;
1474 }
1475 update();
1476 }));
1477 }
1478 };
1479
1480 template<typename Handler>
1481 class PartInfoGetter {
1482 Handler handler;
1483 using allocator_type = boost::asio::associated_allocator_t<Handler>;
1484 using decoder_type = ExecDecodeCB<fifo::op::get_part_info_reply>;
1485 using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>;
1486 decoder_ptr decoder;
1487 public:
1488 PartInfoGetter(Handler&& handler, decoder_ptr&& decoder)
1489 : handler(std::move(handler)), decoder(std::move(decoder)) {}
1490
1491 void operator ()(bs::error_code ec) {
1492 if (!ec) {
1493 ec = decoder->ec;
1494 }
1495 auto reply = std::move(decoder->result);
1496 decoder.reset(); // free handler-allocated memory before dispatching
1497
1498 auto p = ca::bind_handler(std::move(handler),
1499 ec, std::move(reply.header));
1500 std::move(p)();
1501 }
1502 };
1503
1504
1505};
1506
1507namespace detail {
1508template<typename Handler>
1509class JournalProcessor {
1510private:
1511 FIFO* const fifo;
1512 Handler handler;
1513
1514 std::vector<fifo::journal_entry> processed;
1515 std::multimap<std::int64_t, fifo::journal_entry> journal;
1516 std::multimap<std::int64_t, fifo::journal_entry>::iterator iter;
1517 std::int64_t new_tail;
1518 std::int64_t new_head;
1519 std::int64_t new_max;
1520 int race_retries = 0;
1521
1522 template<typename CT>
1523 auto create_part(int64_t part_num, std::string_view tag, CT&& ct) {
1524 WriteOp op;
1525 op.create(false); /* We don't need exclusivity, part_init ensures
1526 we're creating from the same journal entry. */
1527 std::unique_lock l(fifo->m);
1528 part_init(op, tag, fifo->info.params);
1529 auto oid = fifo->info.part_oid(part_num);
1530 l.unlock();
1531 return fifo->r->execute(oid, fifo->ioc,
1532 std::move(op), std::forward<CT>(ct));
1533 }
1534
1535 template<typename CT>
1536 auto remove_part(int64_t part_num, std::string_view tag, CT&& ct) {
1537 WriteOp op;
1538 op.remove();
1539 std::unique_lock l(fifo->m);
1540 auto oid = fifo->info.part_oid(part_num);
1541 l.unlock();
1542 return fifo->r->execute(oid, fifo->ioc,
1543 std::move(op), std::forward<CT>(ct));
1544 }
1545
1546 template<typename PP>
1547 void process_journal_entry(const fifo::journal_entry& entry,
1548 PP&& pp) {
1549 switch (entry.op) {
1550 case fifo::journal_entry::Op::unknown:
1551 std::move(pp)(errc::inconsistency);
1552 return;
1553 break;
1554
1555 case fifo::journal_entry::Op::create:
1556 create_part(entry.part_num, entry.part_tag, std::move(pp));
1557 return;
1558 break;
1559 case fifo::journal_entry::Op::set_head:
1560 ba::post(ba::get_associated_executor(handler, fifo->get_executor()),
1561 [pp = std::move(pp)]() mutable {
1562 std::move(pp)(bs::error_code{});
1563 });
1564 return;
1565 break;
1566 case fifo::journal_entry::Op::remove:
1567 remove_part(entry.part_num, entry.part_tag, std::move(pp));
1568 return;
1569 break;
1570 }
1571 std::move(pp)(errc::inconsistency);
1572 return;
1573 }
1574
1575 auto journal_entry_finisher(const fifo::journal_entry& entry) {
1576 auto a = ba::get_associated_allocator(handler);
1577 auto e = ba::get_associated_executor(handler, fifo->get_executor());
1578 return
1579 ca::bind_ea(
1580 e, a,
1581 [t = std::unique_ptr<JournalProcessor>(this), this,
1582 entry](bs::error_code ec) mutable {
1583 t.release();
1584 if (entry.op == fifo::journal_entry::Op::remove &&
1585 ec == bs::errc::no_such_file_or_directory)
1586 ec.clear();
1587
1588 if (ec) {
1589 ldout(fifo->r->cct(), 0)
1590 << __func__
1591 << "(): ERROR: failed processing journal entry for part="
1592 << entry.part_num << " with error " << ec.message()
1593 << " Bug or inconsistency." << dendl;
1594 handle(errc::inconsistency);
1595 return;
1596 } else {
1597 switch (entry.op) {
1598 case fifo::journal_entry::Op::unknown:
1599 // Can't happen. Filtered out in process_journal_entry.
1600 abort();
1601 break;
1602
1603 case fifo::journal_entry::Op::create:
1604 if (entry.part_num > new_max) {
1605 new_max = entry.part_num;
1606 }
1607 break;
1608 case fifo::journal_entry::Op::set_head:
1609 if (entry.part_num > new_head) {
1610 new_head = entry.part_num;
1611 }
1612 break;
1613 case fifo::journal_entry::Op::remove:
1614 if (entry.part_num >= new_tail) {
1615 new_tail = entry.part_num + 1;
1616 }
1617 break;
1618 }
1619 processed.push_back(entry);
1620 }
1621 ++iter;
1622 process();
1623 });
1624 }
1625
1626 struct JournalPostprocessor {
1627 std::unique_ptr<JournalProcessor> j_;
1628 bool first;
1629 void operator ()(bs::error_code ec, bool canceled) {
1630 std::optional<int64_t> tail_part_num;
1631 std::optional<int64_t> head_part_num;
1632 std::optional<int64_t> max_part_num;
1633
1634 auto j = j_.release();
1635
1636 if (!first && !ec && !canceled) {
1637 j->handle({});
1638 return;
1639 }
1640
1641 if (canceled) {
1642 if (j->race_retries >= MAX_RACE_RETRIES) {
1643 ldout(j->fifo->r->cct(), 0) << "ERROR: " << __func__ <<
1644 "(): race check failed too many times, likely a bug" << dendl;
1645 j->handle(errc::raced);
1646 return;
1647 }
1648
1649 ++j->race_retries;
1650
1651 std::vector<fifo::journal_entry> new_processed;
1652 std::unique_lock l(j->fifo->m);
1653 for (auto& e : j->processed) {
1654 auto jiter = j->fifo->info.journal.find(e.part_num);
1655 /* journal entry was already processed */
1656 if (jiter == j->fifo->info.journal.end() ||
1657 !(jiter->second == e)) {
1658 continue;
1659 }
1660 new_processed.push_back(e);
1661 }
1662 j->processed = std::move(new_processed);
1663 }
1664
1665 std::unique_lock l(j->fifo->m);
1666 auto objv = j->fifo->info.version;
1667 if (j->new_tail > j->fifo->info.tail_part_num) {
1668 tail_part_num = j->new_tail;
1669 }
1670
1671 if (j->new_head > j->fifo->info.head_part_num) {
1672 head_part_num = j->new_head;
1673 }
1674
1675 if (j->new_max > j->fifo->info.max_push_part_num) {
1676 max_part_num = j->new_max;
1677 }
1678 l.unlock();
1679
1680 if (j->processed.empty() &&
1681 !tail_part_num &&
1682 !max_part_num) {
1683 /* nothing to update anymore */
1684 j->handle({});
1685 return;
1686 }
1687 auto a = ba::get_associated_allocator(j->handler);
1688 auto e = ba::get_associated_executor(j->handler, j->fifo->get_executor());
1689 j->fifo->_update_meta(fifo::update{}
1690 .tail_part_num(tail_part_num)
1691 .head_part_num(head_part_num)
1692 .max_push_part_num(max_part_num)
1693 .journal_entries_rm(j->processed),
1694 objv,
1695 ca::bind_ea(
1696 e, a,
1697 JournalPostprocessor{j, false}));
1698 return;
1699 }
1700
1701 JournalPostprocessor(JournalProcessor* j, bool first)
1702 : j_(j), first(first) {}
1703 };
1704
1705 void postprocess() {
1706 if (processed.empty()) {
1707 handle({});
1708 return;
1709 }
1710 JournalPostprocessor(this, true)({}, false);
1711 }
1712
1713 void handle(bs::error_code ec) {
1714 auto e = ba::get_associated_executor(handler, fifo->get_executor());
1715 auto a = ba::get_associated_allocator(handler);
1716 auto h = std::move(handler);
1717 FIFO::assoc_delete(h, this);
1718 e.dispatch(ca::bind_handler(std::move(h), ec), a);
1719 return;
1720 }
1721
1722public:
1723
1724 JournalProcessor(FIFO* fifo, Handler&& handler)
1725 : fifo(fifo), handler(std::move(handler)) {
1726 std::unique_lock l(fifo->m);
1727 journal = fifo->info.journal;
1728 iter = journal.begin();
1729 new_tail = fifo->info.tail_part_num;
1730 new_head = fifo->info.head_part_num;
1731 new_max = fifo->info.max_push_part_num;
1732 }
1733
1734 JournalProcessor(const JournalProcessor&) = delete;
1735 JournalProcessor& operator =(const JournalProcessor&) = delete;
1736 JournalProcessor(JournalProcessor&&) = delete;
1737 JournalProcessor& operator =(JournalProcessor&&) = delete;
1738
1739 void process() {
1740 if (iter != journal.end()) {
1741 const auto entry = iter->second;
1742 process_journal_entry(entry,
1743 journal_entry_finisher(entry));
1744 return;
1745 } else {
1746 postprocess();
1747 return;
1748 }
1749 }
1750};
1751}
1752}
1753
1754#endif // CEPH_RADOS_CLS_FIFIO_H