]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/cls_fifo_legacy.h
9a35e4dd251ce4024bda6ad1212a8acae0cb688c
[ceph.git] / ceph / src / rgw / cls_fifo_legacy.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2020 Red Hat <contact@redhat.com>
7 * Author: Adam C. Emerson
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #ifndef CEPH_RGW_CLS_FIFO_LEGACY_H
17 #define CEPH_RGW_CLS_FIFO_LEGACY_H
18
19 #include <cstdint>
20 #include <deque>
21 #include <map>
22 #include <memory>
23 #include <mutex>
24 #include <optional>
25 #include <string_view>
26 #include <vector>
27
28 #undef FMT_HEADER_ONLY
29 #define FMT_HEADER_ONLY 1
30 #include <fmt/format.h>
31
32 #include "include/rados/librados.hpp"
33 #include "include/buffer.h"
34 #include "include/function2.hpp"
35
36 #include "common/async/yield_context.h"
37
38 #include "cls/fifo/cls_fifo_types.h"
39 #include "cls/fifo/cls_fifo_ops.h"
40
41 #include "librados/AioCompletionImpl.h"
42
43 #include "rgw_tools.h"
44
45 namespace rgw::cls::fifo {
46 namespace cb = ceph::buffer;
47 namespace fifo = rados::cls::fifo;
48 namespace lr = librados;
49
50 inline constexpr std::uint64_t default_max_part_size = 4 * 1024 * 1024;
51 inline constexpr std::uint64_t default_max_entry_size = 32 * 1024;
52
53 void create_meta(lr::ObjectWriteOperation* op, std::string_view id,
54 std::optional<fifo::objv> objv,
55 std::optional<std::string_view> oid_prefix,
56 bool exclusive = false,
57 std::uint64_t max_part_size = default_max_part_size,
58 std::uint64_t max_entry_size = default_max_entry_size);
59 int get_meta(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
60 std::optional<fifo::objv> objv, fifo::info* info,
61 std::uint32_t* part_header_size,
62 std::uint32_t* part_entry_overhead,
63 std::uint64_t tid, optional_yield y,
64 bool probe = false);
65 struct marker {
66 std::int64_t num = 0;
67 std::uint64_t ofs = 0;
68
69 marker() = default;
70 marker(std::int64_t num, std::uint64_t ofs) : num(num), ofs(ofs) {}
71 static marker max() {
72 return { std::numeric_limits<decltype(num)>::max(),
73 std::numeric_limits<decltype(ofs)>::max() };
74 }
75
76 std::string to_string() {
77 return fmt::format("{:0>20}:{:0>20}", num, ofs);
78 }
79 };
80
81 struct list_entry {
82 cb::list data;
83 std::string marker;
84 ceph::real_time mtime;
85 };
86
87 using part_info = fifo::part_header;
88
89 /// This is an implementation of FIFO using librados to facilitate
90 /// backports. Please see /src/neorados/cls/fifo.h for full
91 /// information.
92 ///
93 /// This library uses optional_yield. Please see
94 /// /src/common/async/yield_context.h. In summary, optional_yield
95 /// contains either a spawn::yield_context (in which case the current
96 /// coroutine is suspended until completion) or null_yield (in which
97 /// case the current thread is blocked until completion.)
98 ///
99 /// Please see the librados documentation for information on
100 /// AioCompletion and IoCtx.
101
102 class FIFO {
103 friend struct Reader;
104 friend struct Updater;
105 friend struct Trimmer;
106 friend struct InfoGetter;
107 friend struct Pusher;
108 friend struct NewPartPreparer;
109 friend struct NewHeadPreparer;
110 friend struct JournalProcessor;
111 friend struct Lister;
112
113 mutable lr::IoCtx ioctx;
114 CephContext* cct = static_cast<CephContext*>(ioctx.cct());
115 const std::string oid;
116 std::mutex m;
117 std::uint64_t next_tid = 0;
118
119 fifo::info info;
120
121 std::uint32_t part_header_size = 0xdeadbeef;
122 std::uint32_t part_entry_overhead = 0xdeadbeef;
123
124 std::optional<marker> to_marker(std::string_view s);
125
126 FIFO(lr::IoCtx&& ioc,
127 std::string oid)
128 : ioctx(std::move(ioc)), oid(oid) {}
129
130 std::string generate_tag() const;
131
132 int apply_update(const DoutPrefixProvider *dpp,
133 fifo::info* info,
134 const fifo::objv& objv,
135 const fifo::update& update,
136 std::uint64_t tid);
137 int _update_meta(const DoutPrefixProvider *dpp, const fifo::update& update,
138 fifo::objv version, bool* pcanceled,
139 std::uint64_t tid, optional_yield y);
140 void _update_meta(const DoutPrefixProvider *dpp, const fifo::update& update,
141 fifo::objv version, bool* pcanceled,
142 std::uint64_t tid, lr::AioCompletion* c);
143 int create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::string_view tag, std::uint64_t tid,
144 optional_yield y);
145 int remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::string_view tag, std::uint64_t tid,
146 optional_yield y);
147 int process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y);
148 void process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c);
149 int _prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid, optional_yield y);
150 void _prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid, lr::AioCompletion* c);
151 int _prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y);
152 void _prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c);
153 int push_entries(const DoutPrefixProvider *dpp, const std::deque<cb::list>& data_bufs,
154 std::uint64_t tid, optional_yield y);
155 void push_entries(const std::deque<cb::list>& data_bufs,
156 std::uint64_t tid, lr::AioCompletion* c);
157 int trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
158 std::optional<std::string_view> tag, bool exclusive,
159 std::uint64_t tid, optional_yield y);
160 void trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
161 std::optional<std::string_view> tag, bool exclusive,
162 std::uint64_t tid, lr::AioCompletion* c);
163
164 /// Force refresh of metadata, yielding/blocking style
165 int read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y);
166 /// Force refresh of metadata, with a librados Completion
167 void read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c);
168
169 public:
170
171 FIFO(const FIFO&) = delete;
172 FIFO& operator =(const FIFO&) = delete;
173 FIFO(FIFO&&) = delete;
174 FIFO& operator =(FIFO&&) = delete;
175
176 /// Open an existing FIFO.
177 static int open(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, //< IO Context
178 std::string oid, //< OID for metadata object
179 std::unique_ptr<FIFO>* fifo, //< OUT: Pointer to FIFO object
180 optional_yield y, //< Optional yield context
181 /// Operation will fail if FIFO is not at this version
182 std::optional<fifo::objv> objv = std::nullopt,
183 /// Probing for existence, don't print errors if we
184 /// can't find it.
185 bool probe = false);
186 /// Create a new or open an existing FIFO.
187 static int create(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, //< IO Context
188 std::string oid, //< OID for metadata object
189 std::unique_ptr<FIFO>* fifo, //< OUT: Pointer to FIFO object
190 optional_yield y, //< Optional yield context
191 /// Operation will fail if the FIFO exists and is
192 /// not of this version.
193 std::optional<fifo::objv> objv = std::nullopt,
194 /// Prefix for all objects
195 std::optional<std::string_view> oid_prefix = std::nullopt,
196 /// Fail if the FIFO already exists
197 bool exclusive = false,
198 /// Maximum allowed size of parts
199 std::uint64_t max_part_size = default_max_part_size,
200 /// Maximum allowed size of entries
201 std::uint64_t max_entry_size = default_max_entry_size);
202
203 /// Force refresh of metadata, yielding/blocking style
204 int read_meta(const DoutPrefixProvider *dpp, optional_yield y);
205 /// Get currently known metadata
206 const fifo::info& meta() const;
207 /// Get partition header and entry overhead size
208 std::pair<std::uint32_t, std::uint32_t> get_part_layout_info() const;
209 /// Push an entry to the FIFO
210 int push(const DoutPrefixProvider *dpp,
211 const cb::list& bl, //< Entry to push
212 optional_yield y //< Optional yield
213 );
214 /// Push an entry to the FIFO
215 void push(const DoutPrefixProvider *dpp, const cb::list& bl, //< Entry to push
216 lr::AioCompletion* c //< Async Completion
217 );
218 /// Push entries to the FIFO
219 int push(const DoutPrefixProvider *dpp,
220 const std::vector<cb::list>& data_bufs, //< Entries to push
221 optional_yield y //< Optional yield
222 );
223 /// Push entries to the FIFO
224 void push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_bufs, //< Entries to push
225 lr::AioCompletion* c //< Async Completion
226 );
227 /// List entries
228 int list(const DoutPrefixProvider *dpp,
229 int max_entries, //< Maximum entries to list
230 /// Point after which to begin listing. Start at tail if null
231 std::optional<std::string_view> markstr,
232 std::vector<list_entry>* out, //< OUT: entries
233 /// OUT: True if more entries in FIFO beyond the last returned
234 bool* more,
235 optional_yield y //< Optional yield
236 );
237 void list(const DoutPrefixProvider *dpp,
238 int max_entries, //< Maximum entries to list
239 /// Point after which to begin listing. Start at tail if null
240 std::optional<std::string_view> markstr,
241 std::vector<list_entry>* out, //< OUT: entries
242 /// OUT: True if more entries in FIFO beyond the last returned
243 bool* more,
244 lr::AioCompletion* c //< Async Completion
245 );
246 /// Trim entries, coroutine/block style
247 int trim(const DoutPrefixProvider *dpp,
248 std::string_view markstr, //< Position to which to trim, inclusive
249 bool exclusive, //< If true, do not trim the target entry
250 //< itself, just all those before it.
251 optional_yield y //< Optional yield
252 );
253 /// Trim entries, librados AioCompletion style
254 void trim(const DoutPrefixProvider *dpp,
255 std::string_view markstr, //< Position to which to trim, inclusive
256 bool exclusive, //< If true, do not trim the target entry
257 //< itself, just all those before it.
258 lr::AioCompletion* c //< librados AIO Completion
259 );
260 /// Get part info
261 int get_part_info(const DoutPrefixProvider *dpp, int64_t part_num, /// Part number
262 fifo::part_header* header, //< OUT: Information
263 optional_yield y //< Optional yield
264 );
265 /// Get part info
266 void get_part_info(int64_t part_num, //< Part number
267 fifo::part_header* header, //< OUT: Information
268 lr::AioCompletion* c //< AIO Completion
269 );
270 /// A convenience method to fetch the part information for the FIFO
271 /// head, using librados::AioCompletion, since
272 /// libradio::AioCompletions compose lousily.
273 void get_head_info(const DoutPrefixProvider *dpp, fu2::unique_function< //< Function to receive info
274 void(int r, fifo::part_header&&)>,
275 lr::AioCompletion* c //< AIO Completion
276 );
277 };
278
279 template<typename T>
280 struct Completion {
281 private:
282 const DoutPrefixProvider *_dpp;
283 lr::AioCompletion* _cur = nullptr;
284 lr::AioCompletion* _super;
285 public:
286
287 using Ptr = std::unique_ptr<T>;
288
289 lr::AioCompletion* cur() const {
290 return _cur;
291 }
292 lr::AioCompletion* super() const {
293 return _super;
294 }
295
296 Completion(const DoutPrefixProvider *dpp, lr::AioCompletion* super) : _dpp(dpp), _super(super) {
297 super->pc->get();
298 }
299
300 ~Completion() {
301 if (_super) {
302 _super->pc->put();
303 }
304 if (_cur)
305 _cur->release();
306 _super = nullptr;
307 _cur = nullptr;
308 }
309
310 // The only times that aio_operate can return an error are:
311 // 1. The completion contains a null pointer. This should just
312 // crash, and in our case it does.
313 // 2. An attempt is made to write to a snapshot. RGW doesn't use
314 // snapshots, so we don't care.
315 //
316 // So we will just assert that initiating an Aio operation succeeds
317 // and not worry about recovering.
318 static lr::AioCompletion* call(Ptr&& p) {
319 p->_cur = lr::Rados::aio_create_completion(static_cast<void*>(p.get()),
320 &cb);
321 auto c = p->_cur;
322 p.release();
323 return c;
324 }
325 static void complete(Ptr&& p, int r) {
326 auto c = p->_super;
327 p->_super = nullptr;
328 rgw_complete_aio_completion(c, r);
329 }
330
331 static void cb(lr::completion_t, void* arg) {
332 auto t = static_cast<T*>(arg);
333 auto r = t->_cur->get_return_value();
334 t->_cur->release();
335 t->_cur = nullptr;
336 t->handle(t->_dpp, Ptr(t), r);
337 }
338 };
339
340 }
341
342 #endif // CEPH_RGW_CLS_FIFO_LEGACY_H