]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2020 Red Hat <contact@redhat.com> | |
7 | * Author: Adam C. Emerson | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | * | |
14 | */ | |
15 | ||
16 | #ifndef CEPH_RGW_CLS_FIFO_LEGACY_H | |
17 | #define CEPH_RGW_CLS_FIFO_LEGACY_H | |
18 | ||
19 | #include <cstdint> | |
20 | #include <deque> | |
21 | #include <map> | |
22 | #include <memory> | |
23 | #include <mutex> | |
24 | #include <optional> | |
25 | #include <string_view> | |
26 | #include <vector> | |
27 | ||
28 | #undef FMT_HEADER_ONLY | |
29 | #define FMT_HEADER_ONLY 1 | |
30 | #include <fmt/format.h> | |
31 | ||
32 | #include "include/rados/librados.hpp" | |
33 | #include "include/buffer.h" | |
34 | #include "include/function2.hpp" | |
35 | ||
36 | #include "common/async/yield_context.h" | |
37 | ||
38 | #include "cls/fifo/cls_fifo_types.h" | |
39 | #include "cls/fifo/cls_fifo_ops.h" | |
40 | ||
41 | #include "librados/AioCompletionImpl.h" | |
42 | ||
43 | #include "rgw_tools.h" | |
44 | ||
45 | namespace rgw::cls::fifo { | |
46 | namespace cb = ceph::buffer; | |
47 | namespace fifo = rados::cls::fifo; | |
48 | namespace lr = librados; | |
49 | ||
50 | inline constexpr std::uint64_t default_max_part_size = 4 * 1024 * 1024; | |
51 | inline constexpr std::uint64_t default_max_entry_size = 32 * 1024; | |
52 | ||
53 | void create_meta(lr::ObjectWriteOperation* op, std::string_view id, | |
54 | std::optional<fifo::objv> objv, | |
55 | std::optional<std::string_view> oid_prefix, | |
56 | bool exclusive = false, | |
57 | std::uint64_t max_part_size = default_max_part_size, | |
58 | std::uint64_t max_entry_size = default_max_entry_size); | |
b3b6e05e | 59 | int get_meta(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid, |
f67539c2 TL |
60 | std::optional<fifo::objv> objv, fifo::info* info, |
61 | std::uint32_t* part_header_size, | |
62 | std::uint32_t* part_entry_overhead, | |
63 | std::uint64_t tid, optional_yield y, | |
64 | bool probe = false); | |
f67539c2 TL |
65 | struct marker { |
66 | std::int64_t num = 0; | |
67 | std::uint64_t ofs = 0; | |
68 | ||
69 | marker() = default; | |
70 | marker(std::int64_t num, std::uint64_t ofs) : num(num), ofs(ofs) {} | |
71 | static marker max() { | |
72 | return { std::numeric_limits<decltype(num)>::max(), | |
73 | std::numeric_limits<decltype(ofs)>::max() }; | |
74 | } | |
75 | ||
76 | std::string to_string() { | |
77 | return fmt::format("{:0>20}:{:0>20}", num, ofs); | |
78 | } | |
79 | }; | |
80 | ||
81 | struct list_entry { | |
82 | cb::list data; | |
83 | std::string marker; | |
84 | ceph::real_time mtime; | |
85 | }; | |
86 | ||
87 | using part_info = fifo::part_header; | |
88 | ||
89 | /// This is an implementation of FIFO using librados to facilitate | |
90 | /// backports. Please see /src/neorados/cls/fifo.h for full | |
91 | /// information. | |
92 | /// | |
93 | /// This library uses optional_yield. Please see | |
94 | /// /src/common/async/yield_context.h. In summary, optional_yield | |
95 | /// contains either a spawn::yield_context (in which case the current | |
96 | /// coroutine is suspended until completion) or null_yield (in which | |
97 | /// case the current thread is blocked until completion.) | |
98 | /// | |
99 | /// Please see the librados documentation for information on | |
100 | /// AioCompletion and IoCtx. | |
101 | ||
102 | class FIFO { | |
103 | friend struct Reader; | |
104 | friend struct Updater; | |
105 | friend struct Trimmer; | |
106 | friend struct InfoGetter; | |
107 | friend struct Pusher; | |
108 | friend struct NewPartPreparer; | |
109 | friend struct NewHeadPreparer; | |
110 | friend struct JournalProcessor; | |
111 | friend struct Lister; | |
112 | ||
113 | mutable lr::IoCtx ioctx; | |
114 | CephContext* cct = static_cast<CephContext*>(ioctx.cct()); | |
115 | const std::string oid; | |
116 | std::mutex m; | |
117 | std::uint64_t next_tid = 0; | |
118 | ||
119 | fifo::info info; | |
120 | ||
121 | std::uint32_t part_header_size = 0xdeadbeef; | |
122 | std::uint32_t part_entry_overhead = 0xdeadbeef; | |
123 | ||
124 | std::optional<marker> to_marker(std::string_view s); | |
125 | ||
126 | FIFO(lr::IoCtx&& ioc, | |
127 | std::string oid) | |
128 | : ioctx(std::move(ioc)), oid(oid) {} | |
129 | ||
20effc67 TL |
130 | int apply_update(const DoutPrefixProvider *dpp, |
131 | fifo::info* info, | |
f67539c2 TL |
132 | const fifo::objv& objv, |
133 | const fifo::update& update, | |
134 | std::uint64_t tid); | |
b3b6e05e | 135 | int _update_meta(const DoutPrefixProvider *dpp, const fifo::update& update, |
f67539c2 TL |
136 | fifo::objv version, bool* pcanceled, |
137 | std::uint64_t tid, optional_yield y); | |
b3b6e05e | 138 | void _update_meta(const DoutPrefixProvider *dpp, const fifo::update& update, |
f67539c2 TL |
139 | fifo::objv version, bool* pcanceled, |
140 | std::uint64_t tid, lr::AioCompletion* c); | |
39ae355f | 141 | int create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid, |
f67539c2 | 142 | optional_yield y); |
39ae355f | 143 | int remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid, |
f67539c2 | 144 | optional_yield y); |
b3b6e05e TL |
145 | int process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y); |
146 | void process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c); | |
39ae355f TL |
147 | int _prepare_new_part(const DoutPrefixProvider *dpp, std::int64_t new_part_num, bool is_head, std::uint64_t tid, optional_yield y); |
148 | void _prepare_new_part(const DoutPrefixProvider *dpp, std::int64_t new_part_num, bool is_head, std::uint64_t tid, lr::AioCompletion* c); | |
149 | int _prepare_new_head(const DoutPrefixProvider *dpp, std::int64_t new_head_part_num, | |
150 | std::uint64_t tid, optional_yield y); | |
151 | void _prepare_new_head(const DoutPrefixProvider *dpp, std::int64_t new_head_part_num, std::uint64_t tid, lr::AioCompletion* c); | |
b3b6e05e | 152 | int push_entries(const DoutPrefixProvider *dpp, const std::deque<cb::list>& data_bufs, |
f67539c2 TL |
153 | std::uint64_t tid, optional_yield y); |
154 | void push_entries(const std::deque<cb::list>& data_bufs, | |
155 | std::uint64_t tid, lr::AioCompletion* c); | |
b3b6e05e | 156 | int trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs, |
39ae355f | 157 | bool exclusive, std::uint64_t tid, optional_yield y); |
20effc67 | 158 | void trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs, |
39ae355f | 159 | bool exclusive, std::uint64_t tid, lr::AioCompletion* c); |
f67539c2 TL |
160 | |
161 | /// Force refresh of metadata, yielding/blocking style | |
b3b6e05e | 162 | int read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y); |
f67539c2 | 163 | /// Force refresh of metadata, with a librados Completion |
b3b6e05e | 164 | void read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c); |
f67539c2 TL |
165 | |
166 | public: | |
167 | ||
168 | FIFO(const FIFO&) = delete; | |
169 | FIFO& operator =(const FIFO&) = delete; | |
170 | FIFO(FIFO&&) = delete; | |
171 | FIFO& operator =(FIFO&&) = delete; | |
172 | ||
173 | /// Open an existing FIFO. | |
b3b6e05e | 174 | static int open(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, //< IO Context |
f67539c2 TL |
175 | std::string oid, //< OID for metadata object |
176 | std::unique_ptr<FIFO>* fifo, //< OUT: Pointer to FIFO object | |
177 | optional_yield y, //< Optional yield context | |
178 | /// Operation will fail if FIFO is not at this version | |
179 | std::optional<fifo::objv> objv = std::nullopt, | |
180 | /// Probing for existence, don't print errors if we | |
181 | /// can't find it. | |
182 | bool probe = false); | |
183 | /// Create a new or open an existing FIFO. | |
b3b6e05e | 184 | static int create(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, //< IO Context |
f67539c2 TL |
185 | std::string oid, //< OID for metadata object |
186 | std::unique_ptr<FIFO>* fifo, //< OUT: Pointer to FIFO object | |
187 | optional_yield y, //< Optional yield context | |
188 | /// Operation will fail if the FIFO exists and is | |
189 | /// not of this version. | |
190 | std::optional<fifo::objv> objv = std::nullopt, | |
191 | /// Prefix for all objects | |
192 | std::optional<std::string_view> oid_prefix = std::nullopt, | |
193 | /// Fail if the FIFO already exists | |
194 | bool exclusive = false, | |
195 | /// Maximum allowed size of parts | |
196 | std::uint64_t max_part_size = default_max_part_size, | |
197 | /// Maximum allowed size of entries | |
198 | std::uint64_t max_entry_size = default_max_entry_size); | |
199 | ||
200 | /// Force refresh of metadata, yielding/blocking style | |
b3b6e05e | 201 | int read_meta(const DoutPrefixProvider *dpp, optional_yield y); |
f67539c2 TL |
202 | /// Get currently known metadata |
203 | const fifo::info& meta() const; | |
204 | /// Get partition header and entry overhead size | |
205 | std::pair<std::uint32_t, std::uint32_t> get_part_layout_info() const; | |
206 | /// Push an entry to the FIFO | |
b3b6e05e TL |
207 | int push(const DoutPrefixProvider *dpp, |
208 | const cb::list& bl, //< Entry to push | |
f67539c2 TL |
209 | optional_yield y //< Optional yield |
210 | ); | |
211 | /// Push an entry to the FIFO | |
b3b6e05e | 212 | void push(const DoutPrefixProvider *dpp, const cb::list& bl, //< Entry to push |
f67539c2 TL |
213 | lr::AioCompletion* c //< Async Completion |
214 | ); | |
215 | /// Push entries to the FIFO | |
20effc67 TL |
216 | int push(const DoutPrefixProvider *dpp, |
217 | const std::vector<cb::list>& data_bufs, //< Entries to push | |
f67539c2 TL |
218 | optional_yield y //< Optional yield |
219 | ); | |
220 | /// Push entries to the FIFO | |
b3b6e05e | 221 | void push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_bufs, //< Entries to push |
f67539c2 TL |
222 | lr::AioCompletion* c //< Async Completion |
223 | ); | |
224 | /// List entries | |
b3b6e05e TL |
225 | int list(const DoutPrefixProvider *dpp, |
226 | int max_entries, //< Maximum entries to list | |
f67539c2 TL |
227 | /// Point after which to begin listing. Start at tail if null |
228 | std::optional<std::string_view> markstr, | |
229 | std::vector<list_entry>* out, //< OUT: entries | |
230 | /// OUT: True if more entries in FIFO beyond the last returned | |
231 | bool* more, | |
232 | optional_yield y //< Optional yield | |
233 | ); | |
b3b6e05e TL |
234 | void list(const DoutPrefixProvider *dpp, |
235 | int max_entries, //< Maximum entries to list | |
f67539c2 TL |
236 | /// Point after which to begin listing. Start at tail if null |
237 | std::optional<std::string_view> markstr, | |
238 | std::vector<list_entry>* out, //< OUT: entries | |
239 | /// OUT: True if more entries in FIFO beyond the last returned | |
240 | bool* more, | |
241 | lr::AioCompletion* c //< Async Completion | |
242 | ); | |
243 | /// Trim entries, coroutine/block style | |
b3b6e05e TL |
244 | int trim(const DoutPrefixProvider *dpp, |
245 | std::string_view markstr, //< Position to which to trim, inclusive | |
f67539c2 TL |
246 | bool exclusive, //< If true, do not trim the target entry |
247 | //< itself, just all those before it. | |
248 | optional_yield y //< Optional yield | |
249 | ); | |
250 | /// Trim entries, librados AioCompletion style | |
b3b6e05e TL |
251 | void trim(const DoutPrefixProvider *dpp, |
252 | std::string_view markstr, //< Position to which to trim, inclusive | |
f67539c2 TL |
253 | bool exclusive, //< If true, do not trim the target entry |
254 | //< itself, just all those before it. | |
255 | lr::AioCompletion* c //< librados AIO Completion | |
256 | ); | |
257 | /// Get part info | |
b3b6e05e | 258 | int get_part_info(const DoutPrefixProvider *dpp, int64_t part_num, /// Part number |
f67539c2 TL |
259 | fifo::part_header* header, //< OUT: Information |
260 | optional_yield y //< Optional yield | |
261 | ); | |
262 | /// Get part info | |
263 | void get_part_info(int64_t part_num, //< Part number | |
264 | fifo::part_header* header, //< OUT: Information | |
265 | lr::AioCompletion* c //< AIO Completion | |
266 | ); | |
267 | /// A convenience method to fetch the part information for the FIFO | |
268 | /// head, using librados::AioCompletion, since | |
269 | /// libradio::AioCompletions compose lousily. | |
b3b6e05e | 270 | void get_head_info(const DoutPrefixProvider *dpp, fu2::unique_function< //< Function to receive info |
f67539c2 TL |
271 | void(int r, fifo::part_header&&)>, |
272 | lr::AioCompletion* c //< AIO Completion | |
273 | ); | |
274 | }; | |
275 | ||
276 | template<typename T> | |
277 | struct Completion { | |
278 | private: | |
b3b6e05e | 279 | const DoutPrefixProvider *_dpp; |
f67539c2 TL |
280 | lr::AioCompletion* _cur = nullptr; |
281 | lr::AioCompletion* _super; | |
282 | public: | |
283 | ||
284 | using Ptr = std::unique_ptr<T>; | |
285 | ||
286 | lr::AioCompletion* cur() const { | |
287 | return _cur; | |
288 | } | |
289 | lr::AioCompletion* super() const { | |
290 | return _super; | |
291 | } | |
292 | ||
b3b6e05e | 293 | Completion(const DoutPrefixProvider *dpp, lr::AioCompletion* super) : _dpp(dpp), _super(super) { |
f67539c2 TL |
294 | super->pc->get(); |
295 | } | |
296 | ||
297 | ~Completion() { | |
298 | if (_super) { | |
299 | _super->pc->put(); | |
300 | } | |
301 | if (_cur) | |
302 | _cur->release(); | |
303 | _super = nullptr; | |
304 | _cur = nullptr; | |
305 | } | |
306 | ||
307 | // The only times that aio_operate can return an error are: | |
308 | // 1. The completion contains a null pointer. This should just | |
309 | // crash, and in our case it does. | |
310 | // 2. An attempt is made to write to a snapshot. RGW doesn't use | |
311 | // snapshots, so we don't care. | |
312 | // | |
313 | // So we will just assert that initiating an Aio operation succeeds | |
314 | // and not worry about recovering. | |
315 | static lr::AioCompletion* call(Ptr&& p) { | |
316 | p->_cur = lr::Rados::aio_create_completion(static_cast<void*>(p.get()), | |
317 | &cb); | |
318 | auto c = p->_cur; | |
319 | p.release(); | |
320 | return c; | |
321 | } | |
322 | static void complete(Ptr&& p, int r) { | |
323 | auto c = p->_super; | |
324 | p->_super = nullptr; | |
325 | rgw_complete_aio_completion(c, r); | |
326 | } | |
327 | ||
328 | static void cb(lr::completion_t, void* arg) { | |
329 | auto t = static_cast<T*>(arg); | |
330 | auto r = t->_cur->get_return_value(); | |
331 | t->_cur->release(); | |
332 | t->_cur = nullptr; | |
b3b6e05e | 333 | t->handle(t->_dpp, Ptr(t), r); |
f67539c2 TL |
334 | } |
335 | }; | |
336 | ||
337 | } | |
338 | ||
339 | #endif // CEPH_RGW_CLS_FIFO_LEGACY_H |