]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2020 Red Hat <contact@redhat.com> | |
7 | * Author: Adam C. Emerson | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | * | |
14 | */ | |
15 | ||
16 | #ifndef CEPH_NEORADOS_CLS_FIFIO_H | |
17 | #define CEPH_NEORADOS_CLS_FIFIO_H | |
18 | ||
19 | #include <cstdint> | |
20 | #include <deque> | |
21 | #include <map> | |
22 | #include <memory> | |
23 | #include <mutex> | |
24 | #include <optional> | |
25 | #include <string_view> | |
26 | #include <vector> | |
27 | ||
28 | #include <boost/asio.hpp> | |
29 | #include <boost/system/error_code.hpp> | |
30 | ||
31 | #undef FMT_HEADER_ONLY | |
32 | #define FMT_HEADER_ONLY 1 | |
33 | #include <fmt/format.h> | |
34 | ||
35 | #include "include/neorados/RADOS.hpp" | |
36 | #include "include/buffer.h" | |
37 | ||
38 | #include "common/allocate_unique.h" | |
39 | #include "common/async/bind_handler.h" | |
40 | #include "common/async/bind_like.h" | |
41 | #include "common/async/completion.h" | |
42 | #include "common/async/forward_handler.h" | |
43 | ||
44 | #include "common/dout.h" | |
45 | ||
46 | #include "cls/fifo/cls_fifo_types.h" | |
47 | #include "cls/fifo/cls_fifo_ops.h" | |
48 | ||
49 | namespace neorados::cls::fifo { | |
50 | namespace ba = boost::asio; | |
51 | namespace bs = boost::system; | |
52 | namespace ca = ceph::async; | |
53 | namespace cb = ceph::buffer; | |
54 | namespace fifo = rados::cls::fifo; | |
55 | ||
56 | inline constexpr auto dout_subsys = ceph_subsys_rados; | |
57 | inline constexpr std::uint64_t default_max_part_size = 4 * 1024 * 1024; | |
58 | inline constexpr std::uint64_t default_max_entry_size = 32 * 1024; | |
59 | inline constexpr auto MAX_RACE_RETRIES = 10; | |
60 | ||
61 | ||
62 | const boost::system::error_category& error_category() noexcept; | |
63 | ||
64 | enum class errc { | |
65 | raced = 1, | |
66 | inconsistency, | |
67 | entry_too_large, | |
68 | invalid_marker, | |
69 | update_failed | |
70 | }; | |
71 | } | |
72 | ||
73 | namespace boost::system { | |
74 | template<> | |
75 | struct is_error_code_enum<::neorados::cls::fifo::errc> { | |
76 | static const bool value = true; | |
77 | }; | |
78 | template<> | |
79 | struct is_error_condition_enum<::neorados::cls::fifo::errc> { | |
80 | static const bool value = false; | |
81 | }; | |
82 | } | |
83 | ||
84 | namespace neorados::cls::fifo { | |
85 | // explicit conversion: | |
86 | inline bs::error_code make_error_code(errc e) noexcept { | |
87 | return { static_cast<int>(e), error_category() }; | |
88 | } | |
89 | ||
90 | inline bs::error_code make_error_category(errc e) noexcept { | |
91 | return { static_cast<int>(e), error_category() }; | |
92 | } | |
93 | ||
94 | void create_meta(WriteOp& op, std::string_view id, | |
95 | std::optional<fifo::objv> objv, | |
96 | std::optional<std::string_view> oid_prefix, | |
97 | bool exclusive = false, | |
98 | std::uint64_t max_part_size = default_max_part_size, | |
99 | std::uint64_t max_entry_size = default_max_entry_size); | |
100 | void get_meta(ReadOp& op, std::optional<fifo::objv> objv, | |
101 | bs::error_code* ec_out, fifo::info* info, | |
102 | std::uint32_t* part_header_size, | |
103 | std::uint32_t* part_entry_overhead); | |
104 | ||
105 | void update_meta(WriteOp& op, const fifo::objv& objv, | |
106 | const fifo::update& desc); | |
107 | ||
108 | void part_init(WriteOp& op, std::string_view tag, | |
109 | fifo::data_params params); | |
110 | ||
111 | void push_part(WriteOp& op, std::string_view tag, | |
112 | std::deque<cb::list> data_bufs, | |
113 | fu2::unique_function<void(bs::error_code, int)>); | |
114 | void trim_part(WriteOp& op, std::optional<std::string_view> tag, | |
115 | std::uint64_t ofs, | |
116 | bool exclusive); | |
117 | void list_part(ReadOp& op, | |
118 | std::optional<std::string_view> tag, | |
119 | std::uint64_t ofs, | |
120 | std::uint64_t max_entries, | |
121 | bs::error_code* ec_out, | |
122 | std::vector<fifo::part_list_entry>* entries, | |
123 | bool* more, | |
124 | bool* full_part, | |
125 | std::string* ptag); | |
126 | void get_part_info(ReadOp& op, | |
127 | bs::error_code* out_ec, | |
128 | fifo::part_header* header); | |
129 | ||
130 | struct marker { | |
131 | std::int64_t num = 0; | |
132 | std::uint64_t ofs = 0; | |
133 | ||
134 | marker() = default; | |
135 | marker(std::int64_t num, std::uint64_t ofs) : num(num), ofs(ofs) {} | |
136 | static marker max() { | |
137 | return { std::numeric_limits<decltype(num)>::max(), | |
138 | std::numeric_limits<decltype(ofs)>::max() }; | |
139 | } | |
140 | ||
141 | std::string to_string() { | |
142 | return fmt::format("{:0>20}:{:0>20}", num, ofs); | |
143 | } | |
144 | }; | |
145 | ||
146 | struct list_entry { | |
147 | cb::list data; | |
148 | std::string marker; | |
149 | ceph::real_time mtime; | |
150 | }; | |
151 | ||
152 | using part_info = fifo::part_header; | |
153 | ||
154 | namespace detail { | |
155 | template<typename Handler> | |
156 | class JournalProcessor; | |
157 | } | |
158 | ||
159 | /// Completions, Handlers, and CompletionTokens | |
160 | /// =========================================== | |
161 | /// | |
162 | /// This class is based on Boost.Asio. For information, see | |
163 | /// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio.html | |
164 | /// | |
165 | /// As summary, Asio's design is that of functions taking completion | |
166 | /// handlers. Every handler has a signature, like | |
167 | /// (boost::system::error_code, std::string). The completion handler | |
168 | /// receives the result of the function, and the signature is the type | |
169 | /// of that result. | |
170 | /// | |
171 | /// The completion handler is specified with a CompletionToken. The | |
172 | /// CompletionToken is any type that has a specialization of | |
173 | /// async_complete and async_result. See | |
174 | /// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/async_completion.html | |
175 | /// and https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/async_result.html | |
176 | /// | |
177 | /// The return type of a function taking a CompletionToken is | |
178 | /// async_result<CompletionToken, Signature>::return_type. | |
179 | /// | |
180 | /// Functions | |
181 | /// --------- | |
182 | /// | |
183 | /// The default implementations treat whatever value is described as a | |
184 | /// function, whose parameters correspond to the signature, and calls | |
185 | /// it upon completion. | |
186 | /// | |
187 | /// EXAMPLE: | |
188 | /// Let f be an asynchronous function whose signature is (bs::error_code, int) | |
189 | /// Let g be an asynchronous function whose signature is | |
190 | /// (bs::error_code, int, std::string). | |
191 | /// | |
192 | /// | |
193 | /// f([](bs::error_code ec, int i) { ... }); | |
194 | /// g([](bs::error_code ec, int i, std::string s) { ... }); | |
195 | /// | |
196 | /// Will schedule asynchronous tasks, and the provided lambdas will be | |
197 | /// called on completion. In this case, f and g return void. | |
198 | /// | |
199 | /// There are other specializations. Commonly used ones are. | |
200 | /// | |
201 | /// Futures | |
202 | /// ------- | |
203 | /// | |
204 | /// A CompletionToken of boost::asio::use_future will complete with a | |
205 | /// promise whose type matches (minus any initial error_code) the | |
206 | /// function's signature. The corresponding future is returned. If the | |
207 | /// error_code of the result is non-zero, the future is set with an | |
208 | /// exception of type boost::asio::system_error. | |
209 | /// | |
210 | /// See https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/use_future_t.html | |
211 | /// | |
212 | /// EXAMPLE: | |
213 | /// | |
214 | /// std::future<int> = f(ba::use_future); | |
215 | /// std::future<std::tuple<int, std::string> = g(ba::use_future). | |
216 | /// | |
217 | /// Coroutines | |
218 | /// ---------- | |
219 | /// | |
220 | /// A CompletionToken of type spawn::yield_context suspends execution | |
221 | /// of the current coroutine until completion of the operation. See | |
222 | /// src/spawn/README.md | |
223 | /// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/spawn.html and | |
224 | /// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/yield_context.html | |
225 | /// | |
226 | /// Operations given this CompletionToken return their results, modulo | |
227 | /// any leading error_code. A non-zero error code will be thrown, by | |
228 | /// default, but may be bound to a variable instead with the overload | |
229 | /// of the array-subscript oeprator. | |
230 | /// | |
231 | /// EXAMPLE: | |
232 | /// // Within a function with a yield_context parameter named y | |
233 | /// | |
234 | /// try { | |
235 | /// int i = f(y); | |
236 | /// } catch (const bs::system_error& ec) { ... } | |
237 | /// | |
238 | /// bs::error_code ec; | |
239 | /// auto [i, s] = g(y[ec]); | |
240 | /// | |
241 | /// Blocking calls | |
242 | /// -------------- | |
243 | /// | |
244 | /// ceph::async::use_blocked, defined in src/common/async/blocked_completion.h | |
245 | /// Suspends the current thread of execution, returning the results of | |
246 | /// the operation on resumption. Its calling convention is analogous to | |
247 | /// that of yield_context. | |
248 | /// | |
249 | /// EXAMPLE: | |
250 | /// try { | |
251 | /// int i = f(ca::use_blocked); | |
252 | /// } catch (const bs::system_error& e) { ... } | |
253 | /// | |
254 | /// bs::error_code ec; | |
255 | /// auto [i, s] = g(ca::use_blocked[ec]); | |
256 | /// | |
257 | /// librados Completions | |
258 | /// -------------------- | |
259 | /// | |
260 | /// If src/common/async/librados_completion.h is included in the | |
261 | /// current translation unit, then librados::AioCompletion* may be used | |
262 | /// as a CompletionToken. This is only permitted when the completion | |
263 | /// signature is either bs::system_error or void. The return type of | |
264 | /// functions provided a CompletionToken of AioCompletion* is void. If | |
265 | /// the signature includes an error code and the error code is set, | |
266 | /// then the error is translated to an int which is set as the result | |
267 | /// of the AioCompletion. | |
268 | /// | |
269 | /// EXAMPLE: | |
270 | /// // Assume an asynchronous function h whose signature is bs::error_code. | |
271 | /// | |
272 | /// AioCompletion* c = Rados::aio_create_completion(); | |
273 | /// h(c); | |
274 | /// int r = c.get_return_value(); | |
275 | /// | |
276 | /// See also src/test/cls_fifo/bench_cls_fifo.cc for a full, simple | |
277 | /// example of a program using this class with coroutines. | |
278 | /// | |
279 | /// | |
280 | /// Markers | |
281 | /// ======= | |
282 | /// | |
283 | /// Markers represent a position within the FIFO. Internally, they are | |
284 | /// part/offset pairs. Externally, they are ordered but otherwise | |
285 | /// opaque strings. Markers that compare lower denote positions closer | |
286 | /// to the tail. | |
287 | /// | |
288 | /// A marker is returned with every entry from a list() operation. They | |
289 | /// may be supplied to a list operation to resume from a given | |
290 | /// position, and must be supplied to trim give the position to which | |
291 | /// to trim. | |
292 | ||
293 | class FIFO { | |
294 | public: | |
295 | ||
296 | FIFO(const FIFO&) = delete; | |
297 | FIFO& operator =(const FIFO&) = delete; | |
298 | FIFO(FIFO&&) = delete; | |
299 | FIFO& operator =(FIFO&&) = delete; | |
300 | ||
301 | /// Open an existing FIFO. | |
302 | /// Signature: (bs::error_code ec, std::unique_ptr<FIFO> f) | |
303 | template<typename CT> | |
304 | static auto open(RADOS& r, //< RADOS handle | |
305 | const IOContext& ioc, //< Context for pool, namespace, etc. | |
306 | Object oid, //< OID for the 'main' object of the FIFO | |
307 | CT&& ct, //< CompletionToken | |
308 | /// Fail if is not this version | |
309 | std::optional<fifo::objv> objv = std::nullopt, | |
310 | /// Default executor. By default use the one | |
311 | /// associated with the RADOS handle. | |
312 | std::optional<ba::executor> executor = std::nullopt) { | |
313 | ba::async_completion<CT, void(bs::error_code, | |
314 | std::unique_ptr<FIFO>)> init(ct); | |
315 | auto e = ba::get_associated_executor(init.completion_handler, | |
316 | executor.value_or(r.get_executor())); | |
317 | auto a = ba::get_associated_allocator(init.completion_handler); | |
318 | _read_meta_( | |
319 | &r, oid, ioc, objv, | |
320 | ca::bind_ea( | |
321 | e, a, | |
322 | [&r, ioc, oid, executor, handler = std::move(init.completion_handler)] | |
323 | (bs::error_code ec, fifo::info info, | |
324 | std::uint32_t size, std::uint32_t over) mutable { | |
325 | std::unique_ptr<FIFO> f( | |
326 | new FIFO(r, ioc, oid, executor.value_or(r.get_executor()))); | |
327 | f->info = info; | |
328 | f->part_header_size = size; | |
329 | f->part_entry_overhead = over; | |
330 | // If there are journal entries, process them, in case | |
331 | // someone crashed mid-transaction. | |
332 | if (!ec && !info.journal.empty()) { | |
333 | auto e = ba::get_associated_executor(handler, f->get_executor()); | |
334 | auto a = ba::get_associated_allocator(handler); | |
335 | auto g = f.get(); | |
336 | g->_process_journal( | |
337 | ca::bind_ea( | |
338 | e, a, | |
339 | [f = std::move(f), | |
340 | handler = std::move(handler)](bs::error_code ec) mutable { | |
341 | std::move(handler)(ec, std::move(f)); | |
342 | })); | |
343 | return; | |
344 | } | |
345 | std::move(handler)(ec, std::move(f)); | |
346 | return; | |
347 | })); | |
348 | return init.result.get(); | |
349 | } | |
350 | ||
351 | /// Open an existing or create a new FIFO. | |
352 | /// Signature: (bs::error_code ec, std::unique_ptr<FIFO> f) | |
353 | template<typename CT> | |
354 | static auto create(RADOS& r, /// RADOS handle | |
355 | const IOContext& ioc, /// Context for pool, namespace, etc. | |
356 | Object oid, /// OID for the 'main' object of the FIFO | |
357 | CT&& ct, /// CompletionToken | |
358 | /// Fail if FIFO exists and is not this version | |
359 | std::optional<fifo::objv> objv = std::nullopt, | |
360 | /// Custom prefix for parts | |
361 | std::optional<std::string_view> oid_prefix = std::nullopt, | |
362 | /// Fail if FIFO already exists | |
363 | bool exclusive = false, | |
364 | /// Size at which a part is considered full | |
365 | std::uint64_t max_part_size = default_max_part_size, | |
366 | /// Maximum size of any entry | |
367 | std::uint64_t max_entry_size = default_max_entry_size, | |
368 | /// Default executor. By default use the one | |
369 | /// associated with the RADOS handle. | |
370 | std::optional<ba::executor> executor = std::nullopt) { | |
371 | ba::async_completion<CT, void(bs::error_code, | |
372 | std::unique_ptr<FIFO>)> init(ct); | |
373 | WriteOp op; | |
374 | create_meta(op, oid, objv, oid_prefix, exclusive, max_part_size, | |
375 | max_entry_size); | |
376 | auto e = ba::get_associated_executor(init.completion_handler, | |
377 | executor.value_or(r.get_executor())); | |
378 | auto a = ba::get_associated_allocator(init.completion_handler); | |
379 | r.execute( | |
380 | oid, ioc, std::move(op), | |
381 | ca::bind_ea( | |
382 | e, a, | |
383 | [objv, &r, ioc, oid, executor, handler = std::move(init.completion_handler)] | |
384 | (bs::error_code ec) mutable { | |
385 | if (ec) { | |
386 | std::move(handler)(ec, nullptr); | |
387 | return; | |
388 | } | |
389 | auto e = ba::get_associated_executor( | |
390 | handler, executor.value_or(r.get_executor())); | |
391 | auto a = ba::get_associated_allocator(handler); | |
392 | FIFO::_read_meta_( | |
393 | &r, oid, ioc, objv, | |
394 | ca::bind_ea( | |
395 | e, a, | |
396 | [&r, ioc, executor, oid, handler = std::move(handler)] | |
397 | (bs::error_code ec, fifo::info info, | |
398 | std::uint32_t size, std::uint32_t over) mutable { | |
399 | std::unique_ptr<FIFO> f( | |
400 | new FIFO(r, ioc, oid, executor.value_or(r.get_executor()))); | |
401 | f->info = info; | |
402 | f->part_header_size = size; | |
403 | f->part_entry_overhead = over; | |
404 | if (!ec && !info.journal.empty()) { | |
405 | auto e = ba::get_associated_executor(handler, | |
406 | f->get_executor()); | |
407 | auto a = ba::get_associated_allocator(handler); | |
408 | auto g = f.get(); | |
409 | g->_process_journal( | |
410 | ca::bind_ea( | |
411 | e, a, | |
412 | [f = std::move(f), handler = std::move(handler)] | |
413 | (bs::error_code ec) mutable { | |
414 | std::move(handler)(ec, std::move(f)); | |
415 | })); | |
416 | return; | |
417 | } | |
418 | std::move(handler)(ec, std::move(f)); | |
419 | })); | |
420 | })); | |
421 | return init.result.get(); | |
422 | } | |
423 | ||
424 | /// Force a re-read of FIFO metadata. | |
425 | /// Signature: (bs::error_code ec) | |
426 | template<typename CT> | |
427 | auto read_meta(CT&& ct, //< CompletionToken | |
428 | /// Fail if FIFO not at this version | |
429 | std::optional<fifo::objv> objv = std::nullopt) { | |
430 | std::unique_lock l(m); | |
431 | auto version = info.version; | |
432 | l.unlock(); | |
433 | ba::async_completion<CT, void(bs::error_code)> init(ct); | |
434 | auto e = ba::get_associated_executor(init.completion_handler, | |
435 | get_executor()); | |
436 | auto a = ba::get_associated_allocator(init.completion_handler); | |
437 | _read_meta_( | |
438 | r, oid, ioc, objv, | |
439 | ca::bind_ea( | |
440 | e, a, | |
441 | [this, version, handler = std::move(init.completion_handler)] | |
442 | (bs::error_code ec, fifo::info newinfo, | |
443 | std::uint32_t size, std::uint32_t over) mutable { | |
444 | std::unique_lock l(m); | |
445 | if (version == info.version) { | |
446 | info = newinfo; | |
447 | part_header_size = size; | |
448 | part_entry_overhead = over; | |
449 | } | |
450 | l.unlock(); | |
451 | return std::move(handler)(ec); | |
452 | })); | |
453 | return init.result.get(); | |
454 | } | |
455 | ||
456 | /// Return a reference to currently known metadata | |
457 | const fifo::info& meta() const { | |
458 | return info; | |
459 | } | |
460 | ||
461 | /// Return header size and entry overhead of partitions. | |
462 | std::pair<std::uint32_t, std::uint32_t> get_part_layout_info() { | |
463 | return {part_header_size, part_entry_overhead}; | |
464 | } | |
465 | ||
466 | /// Push a single entry to the FIFO. | |
467 | /// Signature: (bs::error_code) | |
468 | template<typename CT> | |
469 | auto push(const cb::list& bl, //< Bufferlist holding entry to push | |
470 | CT&& ct //< CompletionToken | |
471 | ) { | |
472 | return push(std::vector{ bl }, std::forward<CT>(ct)); | |
473 | } | |
474 | ||
475 | /// Push a many entries to the FIFO. | |
476 | /// Signature: (bs::error_code) | |
477 | template<typename CT> | |
478 | auto push(const std::vector<cb::list>& data_bufs, //< Entries to push | |
479 | CT&& ct //< CompletionToken | |
480 | ) { | |
481 | ba::async_completion<CT, void(bs::error_code)> init(ct); | |
482 | std::unique_lock l(m); | |
483 | auto max_entry_size = info.params.max_entry_size; | |
484 | auto need_new_head = info.need_new_head(); | |
485 | l.unlock(); | |
486 | auto e = ba::get_associated_executor(init.completion_handler, | |
487 | get_executor()); | |
488 | auto a = ba::get_associated_allocator(init.completion_handler); | |
489 | if (data_bufs.empty() ) { | |
490 | // Can't fail if you don't try. | |
491 | e.post(ca::bind_handler(std::move(init.completion_handler), | |
492 | bs::error_code{}), a); | |
493 | return init.result.get(); | |
494 | } | |
495 | ||
496 | // Validate sizes | |
497 | for (const auto& bl : data_bufs) { | |
498 | if (bl.length() > max_entry_size) { | |
499 | ldout(r->cct(), 10) << __func__ << "(): entry too large: " | |
500 | << bl.length() << " > " | |
501 | << info.params.max_entry_size << dendl; | |
502 | e.post(ca::bind_handler(std::move(init.completion_handler), | |
503 | errc::entry_too_large), a); | |
504 | return init.result.get(); | |
505 | } | |
506 | } | |
507 | ||
508 | auto p = ca::bind_ea(e, a, | |
509 | Pusher(this, {data_bufs.begin(), data_bufs.end()}, | |
510 | {}, 0, std::move(init.completion_handler))); | |
511 | ||
512 | if (need_new_head) { | |
513 | _prepare_new_head(std::move(p)); | |
514 | } else { | |
515 | e.dispatch(std::move(p), a); | |
516 | } | |
517 | return init.result.get(); | |
518 | } | |
519 | ||
520 | /// List the entries in a FIFO | |
521 | /// Signature(bs::error_code ec, bs::vector<list_entry> entries, bool more) | |
522 | /// | |
523 | /// More is true if entries beyond the last exist. | |
524 | /// The list entries are of the form: | |
525 | /// data - Contents of the entry | |
526 | /// marker - String representing the position of this entry within the FIFO. | |
527 | /// mtime - Time (on the OSD) at which the entry was pushed. | |
528 | template<typename CT> | |
529 | auto list(int max_entries, //< Maximum number of entries to fetch | |
530 | /// Optionally, a marker indicating the position after | |
531 | /// which to begin listing. If null, begin at the tail. | |
532 | std::optional<std::string_view> markstr, | |
533 | CT&& ct //< CompletionToken | |
534 | ) { | |
535 | ba::async_completion<CT, void(bs::error_code, | |
536 | std::vector<list_entry>, bool)> init(ct); | |
537 | std::unique_lock l(m); | |
538 | std::int64_t part_num = info.tail_part_num; | |
539 | l.unlock(); | |
540 | std::uint64_t ofs = 0; | |
541 | auto a = ba::get_associated_allocator(init.completion_handler); | |
542 | auto e = ba::get_associated_executor(init.completion_handler); | |
543 | ||
544 | if (markstr) { | |
545 | auto marker = to_marker(*markstr); | |
546 | if (!marker) { | |
547 | ldout(r->cct(), 0) << __func__ | |
548 | << "(): failed to parse marker (" << *markstr | |
549 | << ")" << dendl; | |
550 | e.post(ca::bind_handler(std::move(init.completion_handler), | |
551 | errc::invalid_marker, | |
552 | std::vector<list_entry>{}, false), a); | |
553 | return init.result.get(); | |
554 | } | |
555 | part_num = marker->num; | |
556 | ofs = marker->ofs; | |
557 | } | |
558 | ||
559 | using handler_type = decltype(init.completion_handler); | |
560 | auto ls = ceph::allocate_unique<Lister<handler_type>>( | |
561 | a, this, part_num, ofs, max_entries, | |
562 | std::move(init.completion_handler)); | |
563 | ls.release()->list(); | |
564 | return init.result.get(); | |
565 | } | |
566 | ||
567 | /// Trim entries from the tail to the given position | |
568 | /// Signature: (bs::error_code) | |
569 | template<typename CT> | |
570 | auto trim(std::string_view markstr, //< Position to which to trim, inclusive | |
571 | bool exclusive, //< If true, trim markers up to but NOT INCLUDING | |
572 | //< markstr, otherwise trim markstr as well. | |
573 | CT&& ct //< CompletionToken | |
574 | ) { | |
575 | auto m = to_marker(markstr); | |
576 | ba::async_completion<CT, void(bs::error_code)> init(ct); | |
577 | auto a = ba::get_associated_allocator(init.completion_handler); | |
578 | auto e = ba::get_associated_executor(init.completion_handler); | |
579 | if (!m) { | |
580 | ldout(r->cct(), 0) << __func__ << "(): failed to parse marker: marker=" | |
581 | << markstr << dendl; | |
582 | e.post(ca::bind_handler(std::move(init.completion_handler), | |
583 | errc::invalid_marker), a); | |
584 | return init.result.get(); | |
585 | } else { | |
586 | using handler_type = decltype(init.completion_handler); | |
587 | auto t = ceph::allocate_unique<Trimmer<handler_type>>( | |
588 | a, this, m->num, m->ofs, exclusive, std::move(init.completion_handler)); | |
589 | t.release()->trim(); | |
590 | } | |
591 | return init.result.get(); | |
592 | } | |
593 | ||
594 | /// Get information about a specific partition | |
595 | /// Signature: (bs::error_code, part_info) | |
596 | /// | |
597 | /// part_info has the following entries | |
598 | /// tag - A random string identifying this partition. Used internally | |
599 | /// as a sanity check to make sure operations haven't been misdirected | |
600 | /// params - Data parameters, identical for every partition within a | |
601 | /// FIFO and the same as what is returned from get_part_layout() | |
602 | /// magic - A random magic number, used internally as a prefix to | |
603 | /// every entry stored on the OSD to ensure sync | |
604 | /// min_ofs - Offset of the first entry | |
605 | /// max_ofs - Offset of the highest entry | |
606 | /// min_index - Minimum entry index | |
607 | /// max_index - Maximum entry index | |
608 | /// max_time - Time of the latest push | |
609 | /// | |
610 | /// The difference between ofs and index is that ofs is a byte | |
611 | /// offset. Index is a count. Nothing really uses indices, but | |
612 | /// they're tracked and sanity-checked as an invariant on the OSD. | |
613 | /// | |
614 | /// max_ofs and max_time are the two that have been used externally | |
615 | /// so far. | |
616 | template<typename CT> | |
617 | auto get_part_info(int64_t part_num, // The number of the partition | |
618 | CT&& ct // CompletionToken | |
619 | ) { | |
620 | ||
621 | ba::async_completion<CT, void(bs::error_code, part_info)> init(ct); | |
622 | fifo::op::get_part_info gpi; | |
623 | cb::list in; | |
624 | encode(gpi, in); | |
625 | ReadOp op; | |
626 | auto e = ba::get_associated_executor(init.completion_handler, | |
627 | get_executor()); | |
628 | auto a = ba::get_associated_allocator(init.completion_handler); | |
629 | auto reply = ceph::allocate_unique< | |
630 | ExecDecodeCB<fifo::op::get_part_info_reply>>(a); | |
631 | ||
632 | op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, | |
633 | std::ref(*reply)); | |
634 | std::unique_lock l(m); | |
635 | auto part_oid = info.part_oid(part_num); | |
636 | l.unlock(); | |
637 | r->execute(part_oid, ioc, std::move(op), nullptr, | |
638 | ca::bind_ea(e, a, | |
639 | PartInfoGetter(std::move(init.completion_handler), | |
640 | std::move(reply)))); | |
641 | return init.result.get(); | |
642 | } | |
643 | ||
644 | using executor_type = ba::executor; | |
645 | ||
646 | /// Return the default executor, as specified at creation. | |
647 | ba::executor get_executor() const { | |
648 | return executor; | |
649 | } | |
650 | ||
651 | private: | |
652 | template<typename Handler> | |
653 | friend class detail::JournalProcessor; | |
654 | RADOS* const r; | |
655 | const IOContext ioc; | |
656 | const Object oid; | |
657 | std::mutex m; | |
658 | ||
659 | fifo::info info; | |
660 | ||
661 | std::uint32_t part_header_size = 0xdeadbeef; | |
662 | std::uint32_t part_entry_overhead = 0xdeadbeef; | |
663 | ||
664 | ba::executor executor; | |
665 | ||
666 | std::optional<marker> to_marker(std::string_view s); | |
667 | ||
668 | template<typename Handler, typename T> | |
669 | static void assoc_delete(const Handler& handler, T* t) { | |
670 | typename std::allocator_traits<typename ba::associated_allocator<Handler>::type> | |
671 | ::template rebind_alloc<T> a( | |
672 | ba::get_associated_allocator(handler)); | |
673 | a.destroy(t); | |
674 | a.deallocate(t, 1); | |
675 | } | |
676 | ||
677 | FIFO(RADOS& r, | |
678 | IOContext ioc, | |
679 | Object oid, | |
680 | ba::executor executor) | |
681 | : r(&r), ioc(std::move(ioc)), oid(oid), executor(executor) {} | |
682 | ||
683 | std::string generate_tag() const; | |
684 | ||
685 | template <typename T> | |
686 | struct ExecDecodeCB { | |
687 | bs::error_code ec; | |
688 | T result; | |
689 | void operator()(bs::error_code e, const cb::list& r) { | |
690 | if (e) { | |
691 | ec = e; | |
692 | return; | |
693 | } | |
694 | try { | |
695 | auto p = r.begin(); | |
696 | using ceph::decode; | |
697 | decode(result, p); | |
698 | } catch (const cb::error& err) { | |
699 | ec = err.code(); | |
700 | } | |
701 | } | |
702 | }; | |
703 | ||
704 | template<typename Handler> | |
705 | class MetaReader { | |
706 | Handler handler; | |
707 | using allocator_type = boost::asio::associated_allocator_t<Handler>; | |
708 | using decoder_type = ExecDecodeCB<fifo::op::get_meta_reply>; | |
709 | using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>; | |
710 | decoder_ptr decoder; | |
711 | public: | |
712 | MetaReader(Handler&& handler, decoder_ptr&& decoder) | |
713 | : handler(std::move(handler)), decoder(std::move(decoder)) {} | |
714 | ||
715 | void operator ()(bs::error_code ec) { | |
716 | if (!ec) { | |
717 | ec = decoder->ec; | |
718 | } | |
719 | auto reply = std::move(decoder->result); | |
720 | decoder.reset(); // free handler-allocated memory before dispatching | |
721 | ||
722 | std::move(handler)(ec, std::move(reply.info), | |
723 | std::move(reply.part_header_size), | |
724 | std::move(reply.part_entry_overhead)); | |
725 | } | |
726 | }; | |
727 | ||
728 | // Renamed to get around a compiler bug in Bionic that kept | |
729 | // complaining we weren't capturing 'this' to make a static function call. | |
730 | template<typename Handler> | |
731 | static void _read_meta_(RADOS* r, const Object& oid, const IOContext& ioc, | |
732 | std::optional<fifo::objv> objv, | |
733 | Handler&& handler, /* error_code, info, uint64, | |
734 | uint64 */ | |
735 | std::optional<ba::executor> executor = std::nullopt){ | |
736 | fifo::op::get_meta gm; | |
737 | ||
738 | gm.version = objv; | |
739 | ||
740 | cb::list in; | |
741 | encode(gm, in); | |
742 | ReadOp op; | |
743 | ||
744 | auto a = ba::get_associated_allocator(handler); | |
745 | auto reply = | |
746 | ceph::allocate_unique<ExecDecodeCB<fifo::op::get_meta_reply>>(a); | |
747 | ||
748 | auto e = ba::get_associated_executor(handler); | |
749 | op.exec(fifo::op::CLASS, fifo::op::GET_META, in, std::ref(*reply)); | |
750 | r->execute(oid, ioc, std::move(op), nullptr, | |
751 | ca::bind_ea(e, a, MetaReader(std::move(handler), | |
752 | std::move(reply)))); | |
753 | }; | |
754 | ||
755 | template<typename Handler> | |
756 | void _read_meta(Handler&& handler /* error_code */) { | |
757 | auto e = ba::get_associated_executor(handler, get_executor()); | |
758 | auto a = ba::get_associated_allocator(handler); | |
759 | _read_meta_(r, oid, ioc, | |
20effc67 | 760 | std::nullopt, |
f67539c2 TL |
761 | ca::bind_ea( |
762 | e, a, | |
763 | [this, | |
764 | handler = std::move(handler)](bs::error_code ec, | |
765 | fifo::info&& info, | |
766 | std::uint64_t phs, | |
767 | std::uint64_t peo) mutable { | |
768 | std::unique_lock l(m); | |
769 | if (ec) { | |
770 | l.unlock(); | |
771 | std::move(handler)(ec); | |
772 | return; | |
773 | } | |
774 | // We have a newer version already! | |
775 | if (!info.version.same_or_later(this->info.version)) { | |
776 | l.unlock(); | |
777 | std::move(handler)(bs::error_code{}); | |
778 | return; | |
779 | } | |
780 | this->info = std::move(info); | |
781 | part_header_size = phs; | |
782 | part_entry_overhead = peo; | |
783 | l.unlock(); | |
784 | std::move(handler)(bs::error_code{}); | |
785 | }), get_executor()); | |
786 | } | |
787 | ||
788 | bs::error_code apply_update(fifo::info* info, | |
789 | const fifo::objv& objv, | |
790 | const fifo::update& update); | |
791 | ||
792 | ||
793 | template<typename Handler> | |
794 | void _update_meta(const fifo::update& update, | |
795 | fifo::objv version, | |
796 | Handler&& handler /* error_code, bool */) { | |
797 | WriteOp op; | |
798 | ||
799 | cls::fifo::update_meta(op, info.version, update); | |
800 | ||
801 | auto a = ba::get_associated_allocator(handler); | |
802 | auto e = ba::get_associated_executor(handler, get_executor()); | |
803 | ||
804 | r->execute( | |
805 | oid, ioc, std::move(op), | |
806 | ca::bind_ea( | |
807 | e, a, | |
808 | [this, e, a, version, update, | |
809 | handler = std::move(handler)](bs::error_code ec) mutable { | |
810 | if (ec && ec != bs::errc::operation_canceled) { | |
811 | std::move(handler)(ec, bool{}); | |
812 | return; | |
813 | } | |
814 | ||
815 | auto canceled = (ec == bs::errc::operation_canceled); | |
816 | ||
817 | if (!canceled) { | |
818 | ec = apply_update(&info, | |
819 | version, | |
820 | update); | |
821 | if (ec) { | |
822 | canceled = true; | |
823 | } | |
824 | } | |
825 | ||
826 | if (canceled) { | |
827 | _read_meta( | |
828 | ca::bind_ea( | |
829 | e, a, | |
830 | [handler = std::move(handler)](bs::error_code ec) mutable { | |
831 | std::move(handler)(ec, ec ? false : true); | |
832 | })); | |
833 | return; | |
834 | } | |
835 | std::move(handler)(ec, false); | |
836 | return; | |
837 | })); | |
838 | } | |
839 | ||
840 | template<typename Handler> | |
841 | auto _process_journal(Handler&& handler /* error_code */) { | |
842 | auto a = ba::get_associated_allocator(std::ref(handler)); | |
843 | auto j = ceph::allocate_unique<detail::JournalProcessor<Handler>>( | |
844 | a, this, std::move(handler)); | |
845 | auto p = j.release(); | |
846 | p->process(); | |
847 | } | |
848 | ||
849 | template<typename Handler> | |
850 | class NewPartPreparer { | |
851 | FIFO* f; | |
852 | Handler handler; | |
853 | std::vector<fifo::journal_entry> jentries; | |
854 | int i; | |
855 | std::int64_t new_head_part_num; | |
856 | ||
857 | public: | |
858 | ||
859 | void operator ()(bs::error_code ec, bool canceled) { | |
860 | if (ec) { | |
861 | std::move(handler)(ec); | |
862 | return; | |
863 | } | |
864 | ||
865 | if (canceled) { | |
866 | std::unique_lock l(f->m); | |
867 | auto iter = f->info.journal.find(jentries.front().part_num); | |
868 | auto max_push_part_num = f->info.max_push_part_num; | |
869 | auto head_part_num = f->info.head_part_num; | |
870 | auto version = f->info.version; | |
871 | auto found = (iter != f->info.journal.end()); | |
872 | l.unlock(); | |
873 | if ((max_push_part_num >= jentries.front().part_num && | |
874 | head_part_num >= new_head_part_num)) { | |
875 | /* raced, but new part was already written */ | |
876 | std::move(handler)(bs::error_code{}); | |
877 | return; | |
878 | } | |
879 | if (i >= MAX_RACE_RETRIES) { | |
880 | std::move(handler)(errc::raced); | |
881 | return; | |
882 | } | |
883 | if (!found) { | |
884 | auto e = ba::get_associated_executor(handler, f->get_executor()); | |
885 | auto a = ba::get_associated_allocator(handler); | |
886 | f->_update_meta(fifo::update{} | |
887 | .journal_entries_add(jentries), | |
888 | version, | |
889 | ca::bind_ea( | |
890 | e, a, | |
891 | NewPartPreparer(f, std::move(handler), | |
892 | jentries, | |
893 | i + 1, new_head_part_num))); | |
894 | return; | |
895 | } | |
896 | // Fall through. We still need to process the journal. | |
897 | } | |
898 | f->_process_journal(std::move(handler)); | |
899 | return; | |
900 | } | |
901 | ||
902 | NewPartPreparer(FIFO* f, | |
903 | Handler&& handler, | |
904 | std::vector<fifo::journal_entry> jentries, | |
905 | int i, std::int64_t new_head_part_num) | |
906 | : f(f), handler(std::move(handler)), jentries(std::move(jentries)), | |
907 | i(i), new_head_part_num(new_head_part_num) {} | |
908 | }; | |
909 | ||
910 | template<typename Handler> | |
911 | void _prepare_new_part(bool is_head, | |
912 | Handler&& handler /* error_code */) { | |
913 | std::unique_lock l(m); | |
914 | std::vector jentries = { info.next_journal_entry(generate_tag()) }; | |
915 | std::int64_t new_head_part_num = info.head_part_num; | |
916 | auto version = info.version; | |
917 | ||
918 | if (is_head) { | |
919 | auto new_head_jentry = jentries.front(); | |
920 | new_head_jentry.op = fifo::journal_entry::Op::set_head; | |
921 | new_head_part_num = jentries.front().part_num; | |
922 | jentries.push_back(std::move(new_head_jentry)); | |
923 | } | |
924 | l.unlock(); | |
925 | ||
926 | auto e = ba::get_associated_executor(handler, get_executor()); | |
927 | auto a = ba::get_associated_allocator(handler); | |
928 | _update_meta(fifo::update{}.journal_entries_add(jentries), | |
929 | version, | |
930 | ca::bind_ea( | |
931 | e, a, | |
932 | NewPartPreparer(this, std::move(handler), | |
933 | jentries, 0, new_head_part_num))); | |
934 | } | |
935 | ||
936 | template<typename Handler> | |
937 | class NewHeadPreparer { | |
938 | FIFO* f; | |
939 | Handler handler; | |
940 | int i; | |
941 | std::int64_t new_head_num; | |
942 | ||
943 | public: | |
944 | ||
945 | void operator ()(bs::error_code ec, bool canceled) { | |
946 | std::unique_lock l(f->m); | |
947 | auto head_part_num = f->info.head_part_num; | |
948 | auto version = f->info.version; | |
949 | l.unlock(); | |
950 | ||
951 | if (ec) { | |
952 | std::move(handler)(ec); | |
953 | return; | |
954 | } | |
955 | if (canceled) { | |
956 | if (i >= MAX_RACE_RETRIES) { | |
957 | std::move(handler)(errc::raced); | |
958 | return; | |
959 | } | |
960 | ||
961 | // Raced, but there's still work to do! | |
962 | if (head_part_num < new_head_num) { | |
963 | auto e = ba::get_associated_executor(handler, f->get_executor()); | |
964 | auto a = ba::get_associated_allocator(handler); | |
965 | f->_update_meta(fifo::update{}.head_part_num(new_head_num), | |
966 | version, | |
967 | ca::bind_ea( | |
968 | e, a, | |
969 | NewHeadPreparer(f, std::move(handler), | |
970 | i + 1, | |
971 | new_head_num))); | |
972 | return; | |
973 | } | |
974 | } | |
975 | // Either we succeeded, or we were raced by someone who did it for us. | |
976 | std::move(handler)(bs::error_code{}); | |
977 | return; | |
978 | } | |
979 | ||
980 | NewHeadPreparer(FIFO* f, | |
981 | Handler&& handler, | |
982 | int i, std::int64_t new_head_num) | |
983 | : f(f), handler(std::move(handler)), i(i), new_head_num(new_head_num) {} | |
984 | }; | |
985 | ||
986 | template<typename Handler> | |
987 | void _prepare_new_head(Handler&& handler /* error_code */) { | |
988 | std::unique_lock l(m); | |
989 | int64_t new_head_num = info.head_part_num + 1; | |
990 | auto max_push_part_num = info.max_push_part_num; | |
991 | auto version = info.version; | |
992 | l.unlock(); | |
993 | ||
994 | if (max_push_part_num < new_head_num) { | |
995 | auto e = ba::get_associated_executor(handler, get_executor()); | |
996 | auto a = ba::get_associated_allocator(handler); | |
997 | _prepare_new_part( | |
998 | true, | |
999 | ca::bind_ea( | |
1000 | e, a, | |
1001 | [this, new_head_num, | |
1002 | handler = std::move(handler)](bs::error_code ec) mutable { | |
1003 | if (ec) { | |
1004 | handler(ec); | |
1005 | return; | |
1006 | } | |
1007 | std::unique_lock l(m); | |
1008 | if (info.max_push_part_num < new_head_num) { | |
1009 | l.unlock(); | |
1010 | ldout(r->cct(), 0) | |
1011 | << "ERROR: " << __func__ | |
1012 | << ": after new part creation: meta_info.max_push_part_num=" | |
1013 | << info.max_push_part_num << " new_head_num=" | |
1014 | << info.max_push_part_num << dendl; | |
1015 | std::move(handler)(errc::inconsistency); | |
1016 | } else { | |
1017 | l.unlock(); | |
1018 | std::move(handler)(bs::error_code{}); | |
1019 | } | |
1020 | })); | |
1021 | return; | |
1022 | } | |
1023 | auto e = ba::get_associated_executor(handler, get_executor()); | |
1024 | auto a = ba::get_associated_allocator(handler); | |
1025 | _update_meta(fifo::update{}.head_part_num(new_head_num), | |
1026 | version, | |
1027 | ca::bind_ea( | |
1028 | e, a, | |
1029 | NewHeadPreparer(this, std::move(handler), 0, | |
1030 | new_head_num))); | |
1031 | } | |
1032 | ||
1033 | template<typename T> | |
1034 | struct ExecHandleCB { | |
1035 | bs::error_code ec; | |
1036 | T result; | |
1037 | void operator()(bs::error_code e, const T& t) { | |
1038 | if (e) { | |
1039 | ec = e; | |
1040 | return; | |
1041 | } | |
1042 | result = t; | |
1043 | } | |
1044 | }; | |
1045 | ||
1046 | template<typename Handler> | |
1047 | class EntryPusher { | |
1048 | Handler handler; | |
1049 | using allocator_type = boost::asio::associated_allocator_t<Handler>; | |
1050 | using decoder_type = ExecHandleCB<int>; | |
1051 | using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>; | |
1052 | decoder_ptr decoder; | |
1053 | ||
1054 | public: | |
1055 | ||
1056 | EntryPusher(Handler&& handler, decoder_ptr&& decoder) | |
1057 | : handler(std::move(handler)), decoder(std::move(decoder)) {} | |
1058 | ||
1059 | void operator ()(bs::error_code ec) { | |
1060 | if (!ec) { | |
1061 | ec = decoder->ec; | |
1062 | } | |
1063 | auto reply = std::move(decoder->result); | |
1064 | decoder.reset(); // free handler-allocated memory before dispatching | |
1065 | ||
1066 | std::move(handler)(ec, std::move(reply)); | |
1067 | } | |
1068 | }; | |
1069 | ||
1070 | template<typename Handler> | |
1071 | auto push_entries(const std::deque<cb::list>& data_bufs, | |
1072 | Handler&& handler /* error_code, int */) { | |
1073 | WriteOp op; | |
1074 | std::unique_lock l(m); | |
1075 | auto head_part_num = info.head_part_num; | |
1076 | auto tag = info.head_tag; | |
1077 | auto oid = info.part_oid(head_part_num); | |
1078 | l.unlock(); | |
1079 | ||
1080 | auto a = ba::get_associated_allocator(handler); | |
1081 | auto reply = ceph::allocate_unique<ExecHandleCB<int>>(a); | |
1082 | ||
1083 | auto e = ba::get_associated_executor(handler, get_executor()); | |
1084 | push_part(op, tag, data_bufs, std::ref(*reply)); | |
1085 | return r->execute(oid, ioc, std::move(op), | |
1086 | ca::bind_ea(e, a, EntryPusher(std::move(handler), | |
1087 | std::move(reply)))); | |
1088 | } | |
1089 | ||
1090 | template<typename CT> | |
1091 | auto trim_part(int64_t part_num, | |
1092 | uint64_t ofs, | |
1093 | std::optional<std::string_view> tag, | |
1094 | bool exclusive, | |
1095 | CT&& ct) { | |
1096 | WriteOp op; | |
1097 | cls::fifo::trim_part(op, tag, ofs, exclusive); | |
1098 | return r->execute(info.part_oid(part_num), ioc, std::move(op), | |
1099 | std::forward<CT>(ct)); | |
1100 | } | |
1101 | ||
1102 | ||
1103 | template<typename Handler> | |
1104 | class Pusher { | |
1105 | FIFO* f; | |
1106 | std::deque<cb::list> remaining; | |
1107 | std::deque<cb::list> batch; | |
1108 | int i; | |
1109 | Handler handler; | |
1110 | ||
1111 | void prep_then_push(const unsigned successes) { | |
1112 | std::unique_lock l(f->m); | |
1113 | auto max_part_size = f->info.params.max_part_size; | |
1114 | auto part_entry_overhead = f->part_entry_overhead; | |
1115 | l.unlock(); | |
1116 | ||
1117 | uint64_t batch_len = 0; | |
1118 | if (successes > 0) { | |
1119 | if (successes == batch.size()) { | |
1120 | batch.clear(); | |
1121 | } else { | |
1122 | batch.erase(batch.begin(), batch.begin() + successes); | |
1123 | for (const auto& b : batch) { | |
1124 | batch_len += b.length() + part_entry_overhead; | |
1125 | } | |
1126 | } | |
1127 | } | |
1128 | ||
1129 | if (batch.empty() && remaining.empty()) { | |
1130 | std::move(handler)(bs::error_code{}); | |
1131 | return; | |
1132 | } | |
1133 | ||
1134 | while (!remaining.empty() && | |
1135 | (remaining.front().length() + batch_len <= max_part_size)) { | |
1136 | ||
1137 | /* We can send entries with data_len up to max_entry_size, | |
1138 | however, we want to also account the overhead when | |
1139 | dealing with multiple entries. Previous check doesn't | |
1140 | account for overhead on purpose. */ | |
1141 | batch_len += remaining.front().length() + part_entry_overhead; | |
1142 | batch.push_back(std::move(remaining.front())); | |
1143 | remaining.pop_front(); | |
1144 | } | |
1145 | push(); | |
1146 | } | |
1147 | ||
1148 | void push() { | |
1149 | auto e = ba::get_associated_executor(handler, f->get_executor()); | |
1150 | auto a = ba::get_associated_allocator(handler); | |
1151 | f->push_entries(batch, | |
1152 | ca::bind_ea(e, a, | |
1153 | Pusher(f, std::move(remaining), | |
1154 | batch, i, | |
1155 | std::move(handler)))); | |
1156 | } | |
1157 | ||
1158 | public: | |
1159 | ||
1160 | // Initial call! | |
1161 | void operator ()() { | |
1162 | prep_then_push(0); | |
1163 | } | |
1164 | ||
1165 | // Called with response to push_entries | |
1166 | void operator ()(bs::error_code ec, int r) { | |
1167 | if (ec == bs::errc::result_out_of_range) { | |
1168 | auto e = ba::get_associated_executor(handler, f->get_executor()); | |
1169 | auto a = ba::get_associated_allocator(handler); | |
1170 | f->_prepare_new_head( | |
1171 | ca::bind_ea(e, a, | |
1172 | Pusher(f, std::move(remaining), | |
1173 | std::move(batch), i, | |
1174 | std::move(handler)))); | |
1175 | return; | |
1176 | } | |
1177 | if (ec) { | |
1178 | std::move(handler)(ec); | |
1179 | return; | |
1180 | } | |
1181 | i = 0; // We've made forward progress, so reset the race counter! | |
1182 | prep_then_push(r); | |
1183 | } | |
1184 | ||
1185 | // Called with response to prepare_new_head | |
1186 | void operator ()(bs::error_code ec) { | |
1187 | if (ec == bs::errc::operation_canceled) { | |
1188 | if (i == MAX_RACE_RETRIES) { | |
1189 | ldout(f->r->cct(), 0) | |
1190 | << "ERROR: " << __func__ | |
1191 | << "(): race check failed too many times, likely a bug" << dendl; | |
1192 | std::move(handler)(make_error_code(errc::raced)); | |
1193 | return; | |
1194 | } | |
1195 | ++i; | |
1196 | } else if (ec) { | |
1197 | std::move(handler)(ec); | |
1198 | return; | |
1199 | } | |
1200 | ||
1201 | if (batch.empty()) { | |
1202 | prep_then_push(0); | |
1203 | return; | |
1204 | } else { | |
1205 | push(); | |
1206 | return; | |
1207 | } | |
1208 | } | |
1209 | ||
1210 | Pusher(FIFO* f, std::deque<cb::list>&& remaining, | |
1211 | std::deque<cb::list> batch, int i, | |
1212 | Handler&& handler) | |
1213 | : f(f), remaining(std::move(remaining)), | |
1214 | batch(std::move(batch)), i(i), | |
1215 | handler(std::move(handler)) {} | |
1216 | }; | |
1217 | ||
1218 | template<typename Handler> | |
1219 | class Lister { | |
1220 | FIFO* f; | |
1221 | std::vector<list_entry> result; | |
1222 | bool more = false; | |
1223 | std::int64_t part_num; | |
1224 | std::uint64_t ofs; | |
1225 | int max_entries; | |
1226 | bs::error_code ec_out; | |
1227 | std::vector<fifo::part_list_entry> entries; | |
1228 | bool part_more = false; | |
1229 | bool part_full = false; | |
1230 | Handler handler; | |
1231 | ||
1232 | void handle(bs::error_code ec) { | |
1233 | auto h = std::move(handler); | |
1234 | auto m = more; | |
1235 | auto r = std::move(result); | |
1236 | ||
1237 | FIFO::assoc_delete(h, this); | |
1238 | std::move(h)(ec, std::move(r), m); | |
1239 | } | |
1240 | ||
1241 | public: | |
1242 | Lister(FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries, | |
1243 | Handler&& handler) | |
1244 | : f(f), part_num(part_num), ofs(ofs), max_entries(max_entries), | |
1245 | handler(std::move(handler)) { | |
1246 | result.reserve(max_entries); | |
1247 | } | |
1248 | ||
1249 | ||
1250 | Lister(const Lister&) = delete; | |
1251 | Lister& operator =(const Lister&) = delete; | |
1252 | Lister(Lister&&) = delete; | |
1253 | Lister& operator =(Lister&&) = delete; | |
1254 | ||
1255 | void list() { | |
1256 | if (max_entries > 0) { | |
1257 | ReadOp op; | |
1258 | ec_out.clear(); | |
1259 | part_more = false; | |
1260 | part_full = false; | |
1261 | entries.clear(); | |
1262 | ||
1263 | std::unique_lock l(f->m); | |
1264 | auto part_oid = f->info.part_oid(part_num); | |
1265 | l.unlock(); | |
1266 | ||
1267 | list_part(op, | |
1268 | {}, | |
1269 | ofs, | |
1270 | max_entries, | |
1271 | &ec_out, | |
1272 | &entries, | |
1273 | &part_more, | |
1274 | &part_full, | |
1275 | nullptr); | |
1276 | auto e = ba::get_associated_executor(handler, f->get_executor()); | |
1277 | auto a = ba::get_associated_allocator(handler); | |
1278 | f->r->execute( | |
1279 | part_oid, | |
1280 | f->ioc, | |
1281 | std::move(op), | |
1282 | nullptr, | |
1283 | ca::bind_ea( | |
1284 | e, a, | |
1285 | [t = std::unique_ptr<Lister>(this), this, | |
1286 | part_oid](bs::error_code ec) mutable { | |
1287 | t.release(); | |
1288 | if (ec == bs::errc::no_such_file_or_directory) { | |
1289 | auto e = ba::get_associated_executor(handler, | |
1290 | f->get_executor()); | |
1291 | auto a = ba::get_associated_allocator(handler); | |
1292 | f->_read_meta( | |
1293 | ca::bind_ea( | |
1294 | e, a, | |
1295 | [this](bs::error_code ec) mutable { | |
1296 | if (ec) { | |
1297 | handle(ec); | |
1298 | return; | |
1299 | } | |
1300 | ||
1301 | if (part_num < f->info.tail_part_num) { | |
1302 | /* raced with trim? restart */ | |
1303 | max_entries += result.size(); | |
1304 | result.clear(); | |
1305 | part_num = f->info.tail_part_num; | |
1306 | ofs = 0; | |
1307 | list(); | |
1308 | } | |
1309 | /* assuming part was not written yet, so end of data */ | |
1310 | more = false; | |
1311 | handle({}); | |
1312 | return; | |
1313 | })); | |
1314 | return; | |
1315 | } | |
1316 | if (ec) { | |
1317 | ldout(f->r->cct(), 0) | |
1318 | << __func__ | |
1319 | << "(): list_part() on oid=" << part_oid | |
1320 | << " returned ec=" << ec.message() << dendl; | |
1321 | handle(ec); | |
1322 | return; | |
1323 | } | |
1324 | if (ec_out) { | |
1325 | ldout(f->r->cct(), 0) | |
1326 | << __func__ | |
1327 | << "(): list_part() on oid=" << f->info.part_oid(part_num) | |
1328 | << " returned ec=" << ec_out.message() << dendl; | |
1329 | handle(ec_out); | |
1330 | return; | |
1331 | } | |
1332 | ||
1333 | more = part_full || part_more; | |
1334 | for (auto& entry : entries) { | |
1335 | list_entry e; | |
1336 | e.data = std::move(entry.data); | |
1337 | e.marker = marker{part_num, entry.ofs}.to_string(); | |
1338 | e.mtime = entry.mtime; | |
1339 | result.push_back(std::move(e)); | |
1340 | } | |
1341 | max_entries -= entries.size(); | |
1342 | entries.clear(); | |
1343 | if (max_entries > 0 && | |
1344 | part_more) { | |
1345 | list(); | |
1346 | return; | |
1347 | } | |
1348 | ||
1349 | if (!part_full) { /* head part is not full */ | |
1350 | handle({}); | |
1351 | return; | |
1352 | } | |
1353 | ++part_num; | |
1354 | ofs = 0; | |
1355 | list(); | |
1356 | })); | |
1357 | } else { | |
1358 | handle({}); | |
1359 | return; | |
1360 | } | |
1361 | } | |
1362 | }; | |
1363 | ||
1364 | template<typename Handler> | |
1365 | class Trimmer { | |
1366 | FIFO* f; | |
1367 | std::int64_t part_num; | |
1368 | std::uint64_t ofs; | |
1369 | bool exclusive; | |
1370 | Handler handler; | |
1371 | std::int64_t pn; | |
1372 | int i = 0; | |
1373 | ||
1374 | void handle(bs::error_code ec) { | |
1375 | auto h = std::move(handler); | |
1376 | ||
1377 | FIFO::assoc_delete(h, this); | |
1378 | return std::move(h)(ec); | |
1379 | } | |
1380 | ||
1381 | void update() { | |
1382 | std::unique_lock l(f->m); | |
1383 | auto objv = f->info.version; | |
1384 | l.unlock(); | |
1385 | auto a = ba::get_associated_allocator(handler); | |
1386 | auto e = ba::get_associated_executor(handler, f->get_executor()); | |
1387 | f->_update_meta( | |
1388 | fifo::update{}.tail_part_num(part_num), | |
1389 | objv, | |
1390 | ca::bind_ea( | |
1391 | e, a, | |
1392 | [this, t = std::unique_ptr<Trimmer>(this)](bs::error_code ec, | |
1393 | bool canceled) mutable { | |
1394 | t.release(); | |
1395 | if (canceled) | |
1396 | if (i >= MAX_RACE_RETRIES) { | |
1397 | ldout(f->r->cct(), 0) | |
1398 | << "ERROR: " << __func__ | |
1399 | << "(): race check failed too many times, likely a bug" | |
1400 | << dendl; | |
1401 | handle(errc::raced); | |
1402 | return; | |
1403 | } | |
1404 | std::unique_lock l(f->m); | |
1405 | auto tail_part_num = f->info.tail_part_num; | |
1406 | l.unlock(); | |
1407 | if (tail_part_num < part_num) { | |
1408 | ++i; | |
1409 | update(); | |
1410 | return; | |
1411 | } | |
1412 | handle({}); | |
1413 | return; | |
1414 | })); | |
1415 | } | |
1416 | ||
1417 | public: | |
1418 | Trimmer(FIFO* f, std::int64_t part_num, std::uint64_t ofs, | |
1419 | bool exclusive, Handler&& handler) | |
1420 | : f(f), part_num(part_num), ofs(ofs), exclusive(exclusive), | |
1421 | handler(std::move(handler)) { | |
1422 | std::unique_lock l(f->m); | |
1423 | pn = f->info.tail_part_num; | |
1424 | } | |
1425 | ||
1426 | void trim() { | |
1427 | auto a = ba::get_associated_allocator(handler); | |
1428 | auto e = ba::get_associated_executor(handler, f->get_executor()); | |
1429 | if (pn < part_num) { | |
1430 | std::unique_lock l(f->m); | |
1431 | auto max_part_size = f->info.params.max_part_size; | |
1432 | l.unlock(); | |
1433 | f->trim_part( | |
1434 | pn, max_part_size, std::nullopt, | |
1435 | false, | |
1436 | ca::bind_ea( | |
1437 | e, a, | |
1438 | [t = std::unique_ptr<Trimmer>(this), | |
1439 | this](bs::error_code ec) mutable { | |
1440 | t.release(); | |
1441 | if (ec && ec != bs::errc::no_such_file_or_directory) { | |
1442 | ldout(f->r->cct(), 0) | |
1443 | << __func__ << "(): ERROR: trim_part() on part=" | |
1444 | << pn << " returned ec=" << ec.message() << dendl; | |
1445 | handle(ec); | |
1446 | return; | |
1447 | } | |
1448 | ++pn; | |
1449 | trim(); | |
1450 | })); | |
1451 | return; | |
1452 | } | |
1453 | f->trim_part( | |
1454 | part_num, ofs, std::nullopt, exclusive, | |
1455 | ca::bind_ea( | |
1456 | e, a, | |
1457 | [t = std::unique_ptr<Trimmer>(this), | |
1458 | this](bs::error_code ec) mutable { | |
1459 | t.release(); | |
1460 | if (ec && ec != bs::errc::no_such_file_or_directory) { | |
1461 | ldout(f->r->cct(), 0) | |
1462 | << __func__ << "(): ERROR: trim_part() on part=" << part_num | |
1463 | << " returned ec=" << ec.message() << dendl; | |
1464 | handle(ec); | |
1465 | return; | |
1466 | } | |
1467 | std::unique_lock l(f->m); | |
1468 | auto tail_part_num = f->info.tail_part_num; | |
1469 | l.unlock(); | |
1470 | if (part_num <= tail_part_num) { | |
1471 | /* don't need to modify meta info */ | |
1472 | handle({}); | |
1473 | return; | |
1474 | } | |
1475 | update(); | |
1476 | })); | |
1477 | } | |
1478 | }; | |
1479 | ||
1480 | template<typename Handler> | |
1481 | class PartInfoGetter { | |
1482 | Handler handler; | |
1483 | using allocator_type = boost::asio::associated_allocator_t<Handler>; | |
1484 | using decoder_type = ExecDecodeCB<fifo::op::get_part_info_reply>; | |
1485 | using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>; | |
1486 | decoder_ptr decoder; | |
1487 | public: | |
1488 | PartInfoGetter(Handler&& handler, decoder_ptr&& decoder) | |
1489 | : handler(std::move(handler)), decoder(std::move(decoder)) {} | |
1490 | ||
1491 | void operator ()(bs::error_code ec) { | |
1492 | if (!ec) { | |
1493 | ec = decoder->ec; | |
1494 | } | |
1495 | auto reply = std::move(decoder->result); | |
1496 | decoder.reset(); // free handler-allocated memory before dispatching | |
1497 | ||
1498 | auto p = ca::bind_handler(std::move(handler), | |
1499 | ec, std::move(reply.header)); | |
1500 | std::move(p)(); | |
1501 | } | |
1502 | }; | |
1503 | ||
1504 | ||
1505 | }; | |
1506 | ||
1507 | namespace detail { | |
1508 | template<typename Handler> | |
1509 | class JournalProcessor { | |
1510 | private: | |
1511 | FIFO* const fifo; | |
1512 | Handler handler; | |
1513 | ||
1514 | std::vector<fifo::journal_entry> processed; | |
1515 | std::multimap<std::int64_t, fifo::journal_entry> journal; | |
1516 | std::multimap<std::int64_t, fifo::journal_entry>::iterator iter; | |
1517 | std::int64_t new_tail; | |
1518 | std::int64_t new_head; | |
1519 | std::int64_t new_max; | |
1520 | int race_retries = 0; | |
1521 | ||
1522 | template<typename CT> | |
1523 | auto create_part(int64_t part_num, std::string_view tag, CT&& ct) { | |
1524 | WriteOp op; | |
1525 | op.create(false); /* We don't need exclusivity, part_init ensures | |
1526 | we're creating from the same journal entry. */ | |
1527 | std::unique_lock l(fifo->m); | |
1528 | part_init(op, tag, fifo->info.params); | |
1529 | auto oid = fifo->info.part_oid(part_num); | |
1530 | l.unlock(); | |
1531 | return fifo->r->execute(oid, fifo->ioc, | |
1532 | std::move(op), std::forward<CT>(ct)); | |
1533 | } | |
1534 | ||
1535 | template<typename CT> | |
1536 | auto remove_part(int64_t part_num, std::string_view tag, CT&& ct) { | |
1537 | WriteOp op; | |
1538 | op.remove(); | |
1539 | std::unique_lock l(fifo->m); | |
1540 | auto oid = fifo->info.part_oid(part_num); | |
1541 | l.unlock(); | |
1542 | return fifo->r->execute(oid, fifo->ioc, | |
1543 | std::move(op), std::forward<CT>(ct)); | |
1544 | } | |
1545 | ||
1546 | template<typename PP> | |
1547 | void process_journal_entry(const fifo::journal_entry& entry, | |
1548 | PP&& pp) { | |
1549 | switch (entry.op) { | |
1550 | case fifo::journal_entry::Op::unknown: | |
1551 | std::move(pp)(errc::inconsistency); | |
1552 | return; | |
1553 | break; | |
1554 | ||
1555 | case fifo::journal_entry::Op::create: | |
1556 | create_part(entry.part_num, entry.part_tag, std::move(pp)); | |
1557 | return; | |
1558 | break; | |
1559 | case fifo::journal_entry::Op::set_head: | |
1560 | ba::post(ba::get_associated_executor(handler, fifo->get_executor()), | |
1561 | [pp = std::move(pp)]() mutable { | |
1562 | std::move(pp)(bs::error_code{}); | |
1563 | }); | |
1564 | return; | |
1565 | break; | |
1566 | case fifo::journal_entry::Op::remove: | |
1567 | remove_part(entry.part_num, entry.part_tag, std::move(pp)); | |
1568 | return; | |
1569 | break; | |
1570 | } | |
1571 | std::move(pp)(errc::inconsistency); | |
1572 | return; | |
1573 | } | |
1574 | ||
1575 | auto journal_entry_finisher(const fifo::journal_entry& entry) { | |
1576 | auto a = ba::get_associated_allocator(handler); | |
1577 | auto e = ba::get_associated_executor(handler, fifo->get_executor()); | |
1578 | return | |
1579 | ca::bind_ea( | |
1580 | e, a, | |
1581 | [t = std::unique_ptr<JournalProcessor>(this), this, | |
1582 | entry](bs::error_code ec) mutable { | |
1583 | t.release(); | |
1584 | if (entry.op == fifo::journal_entry::Op::remove && | |
1585 | ec == bs::errc::no_such_file_or_directory) | |
1586 | ec.clear(); | |
1587 | ||
1588 | if (ec) { | |
1589 | ldout(fifo->r->cct(), 0) | |
1590 | << __func__ | |
1591 | << "(): ERROR: failed processing journal entry for part=" | |
1592 | << entry.part_num << " with error " << ec.message() | |
1593 | << " Bug or inconsistency." << dendl; | |
1594 | handle(errc::inconsistency); | |
1595 | return; | |
1596 | } else { | |
1597 | switch (entry.op) { | |
1598 | case fifo::journal_entry::Op::unknown: | |
1599 | // Can't happen. Filtered out in process_journal_entry. | |
1600 | abort(); | |
1601 | break; | |
1602 | ||
1603 | case fifo::journal_entry::Op::create: | |
1604 | if (entry.part_num > new_max) { | |
1605 | new_max = entry.part_num; | |
1606 | } | |
1607 | break; | |
1608 | case fifo::journal_entry::Op::set_head: | |
1609 | if (entry.part_num > new_head) { | |
1610 | new_head = entry.part_num; | |
1611 | } | |
1612 | break; | |
1613 | case fifo::journal_entry::Op::remove: | |
1614 | if (entry.part_num >= new_tail) { | |
1615 | new_tail = entry.part_num + 1; | |
1616 | } | |
1617 | break; | |
1618 | } | |
1619 | processed.push_back(entry); | |
1620 | } | |
1621 | ++iter; | |
1622 | process(); | |
1623 | }); | |
1624 | } | |
1625 | ||
1626 | struct JournalPostprocessor { | |
1627 | std::unique_ptr<JournalProcessor> j_; | |
1628 | bool first; | |
1629 | void operator ()(bs::error_code ec, bool canceled) { | |
1630 | std::optional<int64_t> tail_part_num; | |
1631 | std::optional<int64_t> head_part_num; | |
1632 | std::optional<int64_t> max_part_num; | |
1633 | ||
1634 | auto j = j_.release(); | |
1635 | ||
1636 | if (!first && !ec && !canceled) { | |
1637 | j->handle({}); | |
1638 | return; | |
1639 | } | |
1640 | ||
1641 | if (canceled) { | |
1642 | if (j->race_retries >= MAX_RACE_RETRIES) { | |
1643 | ldout(j->fifo->r->cct(), 0) << "ERROR: " << __func__ << | |
1644 | "(): race check failed too many times, likely a bug" << dendl; | |
1645 | j->handle(errc::raced); | |
1646 | return; | |
1647 | } | |
1648 | ||
1649 | ++j->race_retries; | |
1650 | ||
1651 | std::vector<fifo::journal_entry> new_processed; | |
1652 | std::unique_lock l(j->fifo->m); | |
1653 | for (auto& e : j->processed) { | |
1654 | auto jiter = j->fifo->info.journal.find(e.part_num); | |
1655 | /* journal entry was already processed */ | |
1656 | if (jiter == j->fifo->info.journal.end() || | |
1657 | !(jiter->second == e)) { | |
1658 | continue; | |
1659 | } | |
1660 | new_processed.push_back(e); | |
1661 | } | |
1662 | j->processed = std::move(new_processed); | |
1663 | } | |
1664 | ||
1665 | std::unique_lock l(j->fifo->m); | |
1666 | auto objv = j->fifo->info.version; | |
1667 | if (j->new_tail > j->fifo->info.tail_part_num) { | |
1668 | tail_part_num = j->new_tail; | |
1669 | } | |
1670 | ||
1671 | if (j->new_head > j->fifo->info.head_part_num) { | |
1672 | head_part_num = j->new_head; | |
1673 | } | |
1674 | ||
1675 | if (j->new_max > j->fifo->info.max_push_part_num) { | |
1676 | max_part_num = j->new_max; | |
1677 | } | |
1678 | l.unlock(); | |
1679 | ||
1680 | if (j->processed.empty() && | |
1681 | !tail_part_num && | |
1682 | !max_part_num) { | |
1683 | /* nothing to update anymore */ | |
1684 | j->handle({}); | |
1685 | return; | |
1686 | } | |
1687 | auto a = ba::get_associated_allocator(j->handler); | |
1688 | auto e = ba::get_associated_executor(j->handler, j->fifo->get_executor()); | |
1689 | j->fifo->_update_meta(fifo::update{} | |
1690 | .tail_part_num(tail_part_num) | |
1691 | .head_part_num(head_part_num) | |
1692 | .max_push_part_num(max_part_num) | |
1693 | .journal_entries_rm(j->processed), | |
1694 | objv, | |
1695 | ca::bind_ea( | |
1696 | e, a, | |
1697 | JournalPostprocessor{j, false})); | |
1698 | return; | |
1699 | } | |
1700 | ||
1701 | JournalPostprocessor(JournalProcessor* j, bool first) | |
1702 | : j_(j), first(first) {} | |
1703 | }; | |
1704 | ||
1705 | void postprocess() { | |
1706 | if (processed.empty()) { | |
1707 | handle({}); | |
1708 | return; | |
1709 | } | |
1710 | JournalPostprocessor(this, true)({}, false); | |
1711 | } | |
1712 | ||
1713 | void handle(bs::error_code ec) { | |
1714 | auto e = ba::get_associated_executor(handler, fifo->get_executor()); | |
1715 | auto a = ba::get_associated_allocator(handler); | |
1716 | auto h = std::move(handler); | |
1717 | FIFO::assoc_delete(h, this); | |
1718 | e.dispatch(ca::bind_handler(std::move(h), ec), a); | |
1719 | return; | |
1720 | } | |
1721 | ||
1722 | public: | |
1723 | ||
1724 | JournalProcessor(FIFO* fifo, Handler&& handler) | |
1725 | : fifo(fifo), handler(std::move(handler)) { | |
1726 | std::unique_lock l(fifo->m); | |
1727 | journal = fifo->info.journal; | |
1728 | iter = journal.begin(); | |
1729 | new_tail = fifo->info.tail_part_num; | |
1730 | new_head = fifo->info.head_part_num; | |
1731 | new_max = fifo->info.max_push_part_num; | |
1732 | } | |
1733 | ||
1734 | JournalProcessor(const JournalProcessor&) = delete; | |
1735 | JournalProcessor& operator =(const JournalProcessor&) = delete; | |
1736 | JournalProcessor(JournalProcessor&&) = delete; | |
1737 | JournalProcessor& operator =(JournalProcessor&&) = delete; | |
1738 | ||
1739 | void process() { | |
1740 | if (iter != journal.end()) { | |
1741 | const auto entry = iter->second; | |
1742 | process_journal_entry(entry, | |
1743 | journal_entry_finisher(entry)); | |
1744 | return; | |
1745 | } else { | |
1746 | postprocess(); | |
1747 | return; | |
1748 | } | |
1749 | } | |
1750 | }; | |
1751 | } | |
1752 | } | |
1753 | ||
1754 | #endif // CEPH_RADOS_CLS_FIFIO_H |