1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2020 Red Hat <contact@redhat.com>
7 * Author: Adam C. Emerson
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
19 #include <string_view>
21 #undef FMT_HEADER_ONLY
22 #define FMT_HEADER_ONLY 1
23 #include <fmt/format.h>
25 #include <boost/system/error_code.hpp>
27 #include "include/neorados/RADOS.hpp"
29 #include "include/buffer.h"
31 #include "common/random_string.h"
33 #include "cls/fifo/cls_fifo_types.h"
34 #include "cls/fifo/cls_fifo_ops.h"
38 namespace neorados::cls::fifo
{
39 namespace bs
= boost::system
;
40 namespace cb
= ceph::buffer
;
41 namespace fifo
= rados::cls::fifo
;
43 void create_meta(WriteOp
& op
, std::string_view id
,
44 std::optional
<fifo::objv
> objv
,
45 std::optional
<std::string_view
> oid_prefix
,
47 std::uint64_t max_part_size
,
48 std::uint64_t max_entry_size
)
50 fifo::op::create_meta cm
;
54 cm
.oid_prefix
= oid_prefix
;
55 cm
.max_part_size
= max_part_size
;
56 cm
.max_entry_size
= max_entry_size
;
57 cm
.exclusive
= exclusive
;
61 op
.exec(fifo::op::CLASS
, fifo::op::CREATE_META
, in
);
64 void get_meta(ReadOp
& op
, std::optional
<fifo::objv
> objv
,
65 bs::error_code
* ec_out
, fifo::info
* info
,
66 std::uint32_t* part_header_size
,
67 std::uint32_t* part_entry_overhead
)
69 fifo::op::get_meta gm
;
73 op
.exec(fifo::op::CLASS
, fifo::op::GET_META
, in
,
74 [ec_out
, info
, part_header_size
,
75 part_entry_overhead
](bs::error_code ec
, const cb::list
& bl
) {
76 fifo::op::get_meta_reply reply
;
78 auto iter
= bl
.cbegin();
80 } catch (const cb::error
& err
) {
83 if (ec_out
) *ec_out
= ec
;
84 if (info
) *info
= std::move(reply
.info
);
85 if (part_header_size
) *part_header_size
= reply
.part_header_size
;
86 if (part_entry_overhead
)
87 *part_entry_overhead
= reply
.part_entry_overhead
;
91 void update_meta(WriteOp
& op
, const fifo::objv
& objv
,
92 const fifo::update
& update
)
94 fifo::op::update_meta um
;
97 um
.tail_part_num
= update
.tail_part_num();
98 um
.head_part_num
= update
.head_part_num();
99 um
.min_push_part_num
= update
.min_push_part_num();
100 um
.max_push_part_num
= update
.max_push_part_num();
101 um
.journal_entries_add
= std::move(update
).journal_entries_add();
102 um
.journal_entries_rm
= std::move(update
).journal_entries_rm();
106 op
.exec(fifo::op::CLASS
, fifo::op::UPDATE_META
, in
);
109 void part_init(WriteOp
& op
, std::string_view tag
,
110 fifo::data_params params
)
112 fifo::op::init_part ip
;
119 op
.exec(fifo::op::CLASS
, fifo::op::INIT_PART
, in
);
122 void push_part(WriteOp
& op
, std::string_view tag
,
123 std::deque
<cb::list
> data_bufs
,
124 fu2::unique_function
<void(bs::error_code
, int)> f
)
126 fifo::op::push_part pp
;
129 pp
.data_bufs
= data_bufs
;
132 for (const auto& bl
: data_bufs
)
133 pp
.total_len
+= bl
.length();
137 op
.exec(fifo::op::CLASS
, fifo::op::PUSH_PART
, in
,
138 [f
= std::move(f
)](bs::error_code ec
, int r
, const cb::list
&) mutable {
144 void trim_part(WriteOp
& op
,
145 std::optional
<std::string_view
> tag
,
146 std::uint64_t ofs
, bool exclusive
)
148 fifo::op::trim_part tp
;
152 tp
.exclusive
= exclusive
;
156 op
.exec(fifo::op::CLASS
, fifo::op::TRIM_PART
, in
);
159 void list_part(ReadOp
& op
,
160 std::optional
<string_view
> tag
,
162 std::uint64_t max_entries
,
163 bs::error_code
* ec_out
,
164 std::vector
<fifo::part_list_entry
>* entries
,
169 fifo::op::list_part lp
;
173 lp
.max_entries
= max_entries
;
177 op
.exec(fifo::op::CLASS
, fifo::op::LIST_PART
, in
,
178 [entries
, more
, full_part
, ptag
, ec_out
](bs::error_code ec
,
179 const cb::list
& bl
) {
181 if (ec_out
) *ec_out
= ec
;
185 fifo::op::list_part_reply reply
;
186 auto iter
= bl
.cbegin();
189 } catch (const cb::error
& err
) {
190 if (ec_out
) *ec_out
= ec
;
194 if (entries
) *entries
= std::move(reply
.entries
);
195 if (more
) *more
= reply
.more
;
196 if (full_part
) *full_part
= reply
.full_part
;
197 if (ptag
) *ptag
= reply
.tag
;
201 void get_part_info(ReadOp
& op
,
202 bs::error_code
* out_ec
,
203 fifo::part_header
* header
)
205 fifo::op::get_part_info gpi
;
209 op
.exec(fifo::op::CLASS
, fifo::op::GET_PART_INFO
, in
,
210 [out_ec
, header
](bs::error_code ec
, const cb::list
& bl
) {
212 if (out_ec
) *out_ec
= ec
;
214 fifo::op::get_part_info_reply reply
;
215 auto iter
= bl
.cbegin();
218 } catch (const cb::error
& err
) {
219 if (out_ec
) *out_ec
= ec
;
223 if (header
) *header
= std::move(reply
.header
);
227 std::optional
<marker
> FIFO::to_marker(std::string_view s
) {
230 m
.num
= info
.tail_part_num
;
235 auto pos
= s
.find(':');
236 if (pos
== string::npos
) {
240 auto num
= s
.substr(0, pos
);
241 auto ofs
= s
.substr(pos
+ 1);
243 auto n
= ceph::parse
<decltype(m
.num
)>(num
);
248 auto o
= ceph::parse
<decltype(m
.ofs
)>(ofs
);
256 bs::error_code
FIFO::apply_update(fifo::info
* info
,
257 const fifo::objv
& objv
,
258 const fifo::update
& update
) {
259 std::unique_lock
l(m
);
260 auto err
= info
->apply_update(update
);
261 if (objv
!= info
->version
) {
262 ldout(r
->cct(), 0) << __func__
<< "(): Raced locally!" << dendl
;
266 ldout(r
->cct(), 0) << __func__
<< "(): ERROR: " << err
<< dendl
;
267 return errc::update_failed
;
275 std::string
FIFO::generate_tag() const
277 static constexpr auto HEADER_TAG_SIZE
= 16;
278 return gen_rand_alphanumeric_plain(r
->cct(), HEADER_TAG_SIZE
);
281 #pragma GCC diagnostic push
282 #pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
283 #pragma clang diagnostic push
284 #pragma clang diagnostic ignored "-Wnon-virtual-dtor"
285 class error_category
: public ceph::converting_category
{
288 const char* name() const noexcept override
;
289 const char* message(int ev
, char*, std::size_t) const noexcept override
;
290 std::string
message(int ev
) const override
;
291 bs::error_condition
default_error_condition(int ev
) const noexcept
293 bool equivalent(int ev
, const bs::error_condition
& c
) const
295 using ceph::converting_category::equivalent
;
296 int from_code(int ev
) const noexcept override
;
298 #pragma GCC diagnostic pop
299 #pragma clang diagnostic pop
301 const char* error_category::name() const noexcept
{
305 const char* error_category::message(int ev
, char*, std::size_t) const noexcept
{
309 switch (static_cast<errc
>(ev
)) {
311 return "Retry-race count exceeded";
313 case errc::inconsistency
:
314 return "Inconsistent result! New head before old head";
316 case errc::entry_too_large
:
317 return "Pushed entry too large";
319 case errc::invalid_marker
:
320 return "Invalid marker string";
322 case errc::update_failed
:
323 return "Update failed";
326 return "Unknown error";
329 std::string
error_category::message(int ev
) const {
330 return message(ev
, nullptr, 0);
334 error_category::default_error_condition(int ev
) const noexcept
{
335 switch (static_cast<errc
>(ev
)) {
337 return bs::errc::operation_canceled
;
339 case errc::inconsistency
:
340 return bs::errc::io_error
;
342 case errc::entry_too_large
:
343 return bs::errc::value_too_large
;
345 case errc::invalid_marker
:
346 return bs::errc::invalid_argument
;
348 case errc::update_failed
:
349 return bs::errc::invalid_argument
;
352 return { ev
, *this };
355 bool error_category::equivalent(int ev
, const bs::error_condition
& c
) const noexcept
{
356 return default_error_condition(ev
) == c
;
359 int error_category::from_code(int ev
) const noexcept
{
360 switch (static_cast<errc
>(ev
)) {
364 case errc::inconsistency
:
367 case errc::entry_too_large
:
370 case errc::invalid_marker
:
373 case errc::update_failed
:
380 const bs::error_category
& error_category() noexcept
{
381 static const class error_category c
;