]> git.proxmox.com Git - ceph.git/blob - ceph/src/neorados/cls/fifo.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / neorados / cls / fifo.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2020 Red Hat <contact@redhat.com>
7 * Author: Adam C. Emerson
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include <cstdint>
17 #include <numeric>
18 #include <optional>
19 #include <string_view>
20
21 #undef FMT_HEADER_ONLY
22 #define FMT_HEADER_ONLY 1
23 #include <fmt/format.h>
24
25 #include <boost/system/error_code.hpp>
26
27 #include "include/neorados/RADOS.hpp"
28
29 #include "include/buffer.h"
30
31 #include "common/random_string.h"
32
33 #include "cls/fifo/cls_fifo_types.h"
34 #include "cls/fifo/cls_fifo_ops.h"
35
36 #include "fifo.h"
37
38 namespace neorados::cls::fifo {
39 namespace bs = boost::system;
40 namespace cb = ceph::buffer;
41 namespace fifo = rados::cls::fifo;
42
43 void create_meta(WriteOp& op, std::string_view id,
44 std::optional<fifo::objv> objv,
45 std::optional<std::string_view> oid_prefix,
46 bool exclusive,
47 std::uint64_t max_part_size,
48 std::uint64_t max_entry_size)
49 {
50 fifo::op::create_meta cm;
51
52 cm.id = id;
53 cm.version = objv;
54 cm.oid_prefix = oid_prefix;
55 cm.max_part_size = max_part_size;
56 cm.max_entry_size = max_entry_size;
57 cm.exclusive = exclusive;
58
59 cb::list in;
60 encode(cm, in);
61 op.exec(fifo::op::CLASS, fifo::op::CREATE_META, in);
62 }
63
64 void get_meta(ReadOp& op, std::optional<fifo::objv> objv,
65 bs::error_code* ec_out, fifo::info* info,
66 std::uint32_t* part_header_size,
67 std::uint32_t* part_entry_overhead)
68 {
69 fifo::op::get_meta gm;
70 gm.version = objv;
71 cb::list in;
72 encode(gm, in);
73 op.exec(fifo::op::CLASS, fifo::op::GET_META, in,
74 [ec_out, info, part_header_size,
75 part_entry_overhead](bs::error_code ec, const cb::list& bl) {
76 fifo::op::get_meta_reply reply;
77 if (!ec) try {
78 auto iter = bl.cbegin();
79 decode(reply, iter);
80 } catch (const cb::error& err) {
81 ec = err.code();
82 }
83 if (ec_out) *ec_out = ec;
84 if (info) *info = std::move(reply.info);
85 if (part_header_size) *part_header_size = reply.part_header_size;
86 if (part_entry_overhead)
87 *part_entry_overhead = reply.part_entry_overhead;
88 });
89 };
90
91 void update_meta(WriteOp& op, const fifo::objv& objv,
92 const fifo::update& update)
93 {
94 fifo::op::update_meta um;
95
96 um.version = objv;
97 um.tail_part_num = update.tail_part_num();
98 um.head_part_num = update.head_part_num();
99 um.min_push_part_num = update.min_push_part_num();
100 um.max_push_part_num = update.max_push_part_num();
101 um.journal_entries_add = std::move(update).journal_entries_add();
102 um.journal_entries_rm = std::move(update).journal_entries_rm();
103
104 cb::list in;
105 encode(um, in);
106 op.exec(fifo::op::CLASS, fifo::op::UPDATE_META, in);
107 }
108
109 void part_init(WriteOp& op, std::string_view tag,
110 fifo::data_params params)
111 {
112 fifo::op::init_part ip;
113
114 ip.tag = tag;
115 ip.params = params;
116
117 cb::list in;
118 encode(ip, in);
119 op.exec(fifo::op::CLASS, fifo::op::INIT_PART, in);
120 }
121
122 void push_part(WriteOp& op, std::string_view tag,
123 std::deque<cb::list> data_bufs,
124 fu2::unique_function<void(bs::error_code, int)> f)
125 {
126 fifo::op::push_part pp;
127
128 pp.tag = tag;
129 pp.data_bufs = data_bufs;
130 pp.total_len = 0;
131
132 for (const auto& bl : data_bufs)
133 pp.total_len += bl.length();
134
135 cb::list in;
136 encode(pp, in);
137 op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in,
138 [f = std::move(f)](bs::error_code ec, int r, const cb::list&) mutable {
139 std::move(f)(ec, r);
140 });
141 op.returnvec();
142 }
143
144 void trim_part(WriteOp& op,
145 std::optional<std::string_view> tag,
146 std::uint64_t ofs, bool exclusive)
147 {
148 fifo::op::trim_part tp;
149
150 tp.tag = tag;
151 tp.ofs = ofs;
152 tp.exclusive = exclusive;
153
154 bufferlist in;
155 encode(tp, in);
156 op.exec(fifo::op::CLASS, fifo::op::TRIM_PART, in);
157 }
158
159 void list_part(ReadOp& op,
160 std::optional<string_view> tag,
161 std::uint64_t ofs,
162 std::uint64_t max_entries,
163 bs::error_code* ec_out,
164 std::vector<fifo::part_list_entry>* entries,
165 bool* more,
166 bool* full_part,
167 std::string* ptag)
168 {
169 fifo::op::list_part lp;
170
171 lp.tag = tag;
172 lp.ofs = ofs;
173 lp.max_entries = max_entries;
174
175 bufferlist in;
176 encode(lp, in);
177 op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in,
178 [entries, more, full_part, ptag, ec_out](bs::error_code ec,
179 const cb::list& bl) {
180 if (ec) {
181 if (ec_out) *ec_out = ec;
182 return;
183 }
184
185 fifo::op::list_part_reply reply;
186 auto iter = bl.cbegin();
187 try {
188 decode(reply, iter);
189 } catch (const cb::error& err) {
190 if (ec_out) *ec_out = ec;
191 return;
192 }
193
194 if (entries) *entries = std::move(reply.entries);
195 if (more) *more = reply.more;
196 if (full_part) *full_part = reply.full_part;
197 if (ptag) *ptag = reply.tag;
198 });
199 }
200
201 void get_part_info(ReadOp& op,
202 bs::error_code* out_ec,
203 fifo::part_header* header)
204 {
205 fifo::op::get_part_info gpi;
206
207 bufferlist in;
208 encode(gpi, in);
209 op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in,
210 [out_ec, header](bs::error_code ec, const cb::list& bl) {
211 if (ec) {
212 if (out_ec) *out_ec = ec;
213 }
214 fifo::op::get_part_info_reply reply;
215 auto iter = bl.cbegin();
216 try {
217 decode(reply, iter);
218 } catch (const cb::error& err) {
219 if (out_ec) *out_ec = ec;
220 return;
221 }
222
223 if (header) *header = std::move(reply.header);
224 });
225 }
226
227 std::optional<marker> FIFO::to_marker(std::string_view s) {
228 marker m;
229 if (s.empty()) {
230 m.num = info.tail_part_num;
231 m.ofs = 0;
232 return m;
233 }
234
235 auto pos = s.find(':');
236 if (pos == string::npos) {
237 return std::nullopt;
238 }
239
240 auto num = s.substr(0, pos);
241 auto ofs = s.substr(pos + 1);
242
243 auto n = ceph::parse<decltype(m.num)>(num);
244 if (!n) {
245 return std::nullopt;
246 }
247 m.num = *n;
248 auto o = ceph::parse<decltype(m.ofs)>(ofs);
249 if (!o) {
250 return std::nullopt;
251 }
252 m.ofs = *o;
253 return m;
254 }
255
256 bs::error_code FIFO::apply_update(fifo::info* info,
257 const fifo::objv& objv,
258 const fifo::update& update) {
259 std::unique_lock l(m);
260 auto err = info->apply_update(update);
261 if (objv != info->version) {
262 ldout(r->cct(), 0) << __func__ << "(): Raced locally!" << dendl;
263 return errc::raced;
264 }
265 if (err) {
266 ldout(r->cct(), 0) << __func__ << "(): ERROR: " << err << dendl;
267 return errc::update_failed;
268 }
269
270 ++info->version.ver;
271
272 return {};
273 }
274
275 std::string FIFO::generate_tag() const
276 {
277 static constexpr auto HEADER_TAG_SIZE = 16;
278 return gen_rand_alphanumeric_plain(r->cct(), HEADER_TAG_SIZE);
279 }
280
281 #pragma GCC diagnostic push
282 #pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
283 #pragma clang diagnostic push
284 #pragma clang diagnostic ignored "-Wnon-virtual-dtor"
285 class error_category : public ceph::converting_category {
286 public:
287 error_category(){}
288 const char* name() const noexcept override;
289 const char* message(int ev, char*, std::size_t) const noexcept override;
290 std::string message(int ev) const override;
291 bs::error_condition default_error_condition(int ev) const noexcept
292 override;
293 bool equivalent(int ev, const bs::error_condition& c) const
294 noexcept override;
295 using ceph::converting_category::equivalent;
296 int from_code(int ev) const noexcept override;
297 };
298 #pragma GCC diagnostic pop
299 #pragma clang diagnostic pop
300
301 const char* error_category::name() const noexcept {
302 return "FIFO";
303 }
304
305 const char* error_category::message(int ev, char*, std::size_t) const noexcept {
306 if (ev == 0)
307 return "No error";
308
309 switch (static_cast<errc>(ev)) {
310 case errc::raced:
311 return "Retry-race count exceeded";
312
313 case errc::inconsistency:
314 return "Inconsistent result! New head before old head";
315
316 case errc::entry_too_large:
317 return "Pushed entry too large";
318
319 case errc::invalid_marker:
320 return "Invalid marker string";
321
322 case errc::update_failed:
323 return "Update failed";
324 }
325
326 return "Unknown error";
327 }
328
329 std::string error_category::message(int ev) const {
330 return message(ev, nullptr, 0);
331 }
332
333 bs::error_condition
334 error_category::default_error_condition(int ev) const noexcept {
335 switch (static_cast<errc>(ev)) {
336 case errc::raced:
337 return bs::errc::operation_canceled;
338
339 case errc::inconsistency:
340 return bs::errc::io_error;
341
342 case errc::entry_too_large:
343 return bs::errc::value_too_large;
344
345 case errc::invalid_marker:
346 return bs::errc::invalid_argument;
347
348 case errc::update_failed:
349 return bs::errc::invalid_argument;
350 }
351
352 return { ev, *this };
353 }
354
355 bool error_category::equivalent(int ev, const bs::error_condition& c) const noexcept {
356 return default_error_condition(ev) == c;
357 }
358
359 int error_category::from_code(int ev) const noexcept {
360 switch (static_cast<errc>(ev)) {
361 case errc::raced:
362 return -ECANCELED;
363
364 case errc::inconsistency:
365 return -EIO;
366
367 case errc::entry_too_large:
368 return -E2BIG;
369
370 case errc::invalid_marker:
371 return -EINVAL;
372
373 case errc::update_failed:
374 return -EINVAL;
375
376 }
377 return -EDOM;
378 }
379
380 const bs::error_category& error_category() noexcept {
381 static const class error_category c;
382 return c;
383 }
384
385 }