]> git.proxmox.com Git - ceph.git/blob - ceph/src/neorados/cls/fifo.cc
f95ede5ff8f61ead4f7c8d5cf29ef9f02f9a767d
[ceph.git] / ceph / src / neorados / cls / fifo.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2020 Red Hat <contact@redhat.com>
7 * Author: Adam C. Emerson
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include <cstdint>
17 #include <numeric>
18 #include <optional>
19 #include <string_view>
20
21 #undef FMT_HEADER_ONLY
22 #define FMT_HEADER_ONLY 1
23 #include <fmt/format.h>
24
25 #include <boost/system/error_code.hpp>
26
27 #include "include/neorados/RADOS.hpp"
28
29 #include "include/buffer.h"
30
31 #include "common/random_string.h"
32
33 #include "cls/fifo/cls_fifo_types.h"
34 #include "cls/fifo/cls_fifo_ops.h"
35
36 #include "fifo.h"
37
38 using namespace std;
39
40 namespace neorados::cls::fifo {
41 namespace bs = boost::system;
42 namespace cb = ceph::buffer;
43 namespace fifo = rados::cls::fifo;
44
45 void create_meta(WriteOp& op, std::string_view id,
46 std::optional<fifo::objv> objv,
47 std::optional<std::string_view> oid_prefix,
48 bool exclusive,
49 std::uint64_t max_part_size,
50 std::uint64_t max_entry_size)
51 {
52 fifo::op::create_meta cm;
53
54 cm.id = id;
55 cm.version = objv;
56 cm.oid_prefix = oid_prefix;
57 cm.max_part_size = max_part_size;
58 cm.max_entry_size = max_entry_size;
59 cm.exclusive = exclusive;
60
61 cb::list in;
62 encode(cm, in);
63 op.exec(fifo::op::CLASS, fifo::op::CREATE_META, in);
64 }
65
66 void get_meta(ReadOp& op, std::optional<fifo::objv> objv,
67 bs::error_code* ec_out, fifo::info* info,
68 std::uint32_t* part_header_size,
69 std::uint32_t* part_entry_overhead)
70 {
71 fifo::op::get_meta gm;
72 gm.version = objv;
73 cb::list in;
74 encode(gm, in);
75 op.exec(fifo::op::CLASS, fifo::op::GET_META, in,
76 [ec_out, info, part_header_size,
77 part_entry_overhead](bs::error_code ec, const cb::list& bl) {
78 fifo::op::get_meta_reply reply;
79 if (!ec) try {
80 auto iter = bl.cbegin();
81 decode(reply, iter);
82 } catch (const cb::error& err) {
83 ec = err.code();
84 }
85 if (ec_out) *ec_out = ec;
86 if (info) *info = std::move(reply.info);
87 if (part_header_size) *part_header_size = reply.part_header_size;
88 if (part_entry_overhead)
89 *part_entry_overhead = reply.part_entry_overhead;
90 });
91 };
92
93 void update_meta(WriteOp& op, const fifo::objv& objv,
94 const fifo::update& update)
95 {
96 fifo::op::update_meta um;
97
98 um.version = objv;
99 um.tail_part_num = update.tail_part_num();
100 um.head_part_num = update.head_part_num();
101 um.min_push_part_num = update.min_push_part_num();
102 um.max_push_part_num = update.max_push_part_num();
103 um.journal_entries_add = std::move(update).journal_entries_add();
104 um.journal_entries_rm = std::move(update).journal_entries_rm();
105
106 cb::list in;
107 encode(um, in);
108 op.exec(fifo::op::CLASS, fifo::op::UPDATE_META, in);
109 }
110
111 void part_init(WriteOp& op, std::string_view tag,
112 fifo::data_params params)
113 {
114 fifo::op::init_part ip;
115
116 ip.tag = tag;
117 ip.params = params;
118
119 cb::list in;
120 encode(ip, in);
121 op.exec(fifo::op::CLASS, fifo::op::INIT_PART, in);
122 }
123
124 void push_part(WriteOp& op, std::string_view tag,
125 std::deque<cb::list> data_bufs,
126 fu2::unique_function<void(bs::error_code, int)> f)
127 {
128 fifo::op::push_part pp;
129
130 pp.tag = tag;
131 pp.data_bufs = data_bufs;
132 pp.total_len = 0;
133
134 for (const auto& bl : data_bufs)
135 pp.total_len += bl.length();
136
137 cb::list in;
138 encode(pp, in);
139 op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in,
140 [f = std::move(f)](bs::error_code ec, int r, const cb::list&) mutable {
141 std::move(f)(ec, r);
142 });
143 op.returnvec();
144 }
145
146 void trim_part(WriteOp& op,
147 std::optional<std::string_view> tag,
148 std::uint64_t ofs, bool exclusive)
149 {
150 fifo::op::trim_part tp;
151
152 tp.tag = tag;
153 tp.ofs = ofs;
154 tp.exclusive = exclusive;
155
156 bufferlist in;
157 encode(tp, in);
158 op.exec(fifo::op::CLASS, fifo::op::TRIM_PART, in);
159 }
160
161 void list_part(ReadOp& op,
162 std::optional<string_view> tag,
163 std::uint64_t ofs,
164 std::uint64_t max_entries,
165 bs::error_code* ec_out,
166 std::vector<fifo::part_list_entry>* entries,
167 bool* more,
168 bool* full_part,
169 std::string* ptag)
170 {
171 fifo::op::list_part lp;
172
173 lp.tag = tag;
174 lp.ofs = ofs;
175 lp.max_entries = max_entries;
176
177 bufferlist in;
178 encode(lp, in);
179 op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in,
180 [entries, more, full_part, ptag, ec_out](bs::error_code ec,
181 const cb::list& bl) {
182 if (ec) {
183 if (ec_out) *ec_out = ec;
184 return;
185 }
186
187 fifo::op::list_part_reply reply;
188 auto iter = bl.cbegin();
189 try {
190 decode(reply, iter);
191 } catch (const cb::error& err) {
192 if (ec_out) *ec_out = ec;
193 return;
194 }
195
196 if (entries) *entries = std::move(reply.entries);
197 if (more) *more = reply.more;
198 if (full_part) *full_part = reply.full_part;
199 if (ptag) *ptag = reply.tag;
200 });
201 }
202
203 void get_part_info(ReadOp& op,
204 bs::error_code* out_ec,
205 fifo::part_header* header)
206 {
207 fifo::op::get_part_info gpi;
208
209 bufferlist in;
210 encode(gpi, in);
211 op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in,
212 [out_ec, header](bs::error_code ec, const cb::list& bl) {
213 if (ec) {
214 if (out_ec) *out_ec = ec;
215 }
216 fifo::op::get_part_info_reply reply;
217 auto iter = bl.cbegin();
218 try {
219 decode(reply, iter);
220 } catch (const cb::error& err) {
221 if (out_ec) *out_ec = ec;
222 return;
223 }
224
225 if (header) *header = std::move(reply.header);
226 });
227 }
228
229 std::optional<marker> FIFO::to_marker(std::string_view s) {
230 marker m;
231 if (s.empty()) {
232 m.num = info.tail_part_num;
233 m.ofs = 0;
234 return m;
235 }
236
237 auto pos = s.find(':');
238 if (pos == string::npos) {
239 return std::nullopt;
240 }
241
242 auto num = s.substr(0, pos);
243 auto ofs = s.substr(pos + 1);
244
245 auto n = ceph::parse<decltype(m.num)>(num);
246 if (!n) {
247 return std::nullopt;
248 }
249 m.num = *n;
250 auto o = ceph::parse<decltype(m.ofs)>(ofs);
251 if (!o) {
252 return std::nullopt;
253 }
254 m.ofs = *o;
255 return m;
256 }
257
258 bs::error_code FIFO::apply_update(fifo::info* info,
259 const fifo::objv& objv,
260 const fifo::update& update) {
261 std::unique_lock l(m);
262 auto err = info->apply_update(update);
263 if (objv != info->version) {
264 ldout(r->cct(), 0) << __func__ << "(): Raced locally!" << dendl;
265 return errc::raced;
266 }
267 if (err) {
268 ldout(r->cct(), 0) << __func__ << "(): ERROR: " << err << dendl;
269 return errc::update_failed;
270 }
271
272 ++info->version.ver;
273
274 return {};
275 }
276
277 std::string FIFO::generate_tag() const
278 {
279 static constexpr auto HEADER_TAG_SIZE = 16;
280 return gen_rand_alphanumeric_plain(r->cct(), HEADER_TAG_SIZE);
281 }
282
283 #pragma GCC diagnostic push
284 #pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
285 #pragma clang diagnostic push
286 #pragma clang diagnostic ignored "-Wnon-virtual-dtor"
287 class error_category : public ceph::converting_category {
288 public:
289 error_category(){}
290 const char* name() const noexcept override;
291 const char* message(int ev, char*, std::size_t) const noexcept override;
292 std::string message(int ev) const override;
293 bs::error_condition default_error_condition(int ev) const noexcept
294 override;
295 bool equivalent(int ev, const bs::error_condition& c) const
296 noexcept override;
297 using ceph::converting_category::equivalent;
298 int from_code(int ev) const noexcept override;
299 };
300 #pragma GCC diagnostic pop
301 #pragma clang diagnostic pop
302
303 const char* error_category::name() const noexcept {
304 return "FIFO";
305 }
306
307 const char* error_category::message(int ev, char*, std::size_t) const noexcept {
308 if (ev == 0)
309 return "No error";
310
311 switch (static_cast<errc>(ev)) {
312 case errc::raced:
313 return "Retry-race count exceeded";
314
315 case errc::inconsistency:
316 return "Inconsistent result! New head before old head";
317
318 case errc::entry_too_large:
319 return "Pushed entry too large";
320
321 case errc::invalid_marker:
322 return "Invalid marker string";
323
324 case errc::update_failed:
325 return "Update failed";
326 }
327
328 return "Unknown error";
329 }
330
331 std::string error_category::message(int ev) const {
332 return message(ev, nullptr, 0);
333 }
334
335 bs::error_condition
336 error_category::default_error_condition(int ev) const noexcept {
337 switch (static_cast<errc>(ev)) {
338 case errc::raced:
339 return bs::errc::operation_canceled;
340
341 case errc::inconsistency:
342 return bs::errc::io_error;
343
344 case errc::entry_too_large:
345 return bs::errc::value_too_large;
346
347 case errc::invalid_marker:
348 return bs::errc::invalid_argument;
349
350 case errc::update_failed:
351 return bs::errc::invalid_argument;
352 }
353
354 return { ev, *this };
355 }
356
357 bool error_category::equivalent(int ev, const bs::error_condition& c) const noexcept {
358 return default_error_condition(ev) == c;
359 }
360
361 int error_category::from_code(int ev) const noexcept {
362 switch (static_cast<errc>(ev)) {
363 case errc::raced:
364 return -ECANCELED;
365
366 case errc::inconsistency:
367 return -EIO;
368
369 case errc::entry_too_large:
370 return -E2BIG;
371
372 case errc::invalid_marker:
373 return -EINVAL;
374
375 case errc::update_failed:
376 return -EINVAL;
377
378 }
379 return -EDOM;
380 }
381
382 const bs::error_category& error_category() noexcept {
383 static const class error_category c;
384 return c;
385 }
386
387 }