1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2020 Red Hat <contact@redhat.com>
7 * Author: Adam C. Emerson
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
19 #include <string_view>
21 #undef FMT_HEADER_ONLY
22 #define FMT_HEADER_ONLY 1
23 #include <fmt/format.h>
25 #include <boost/system/error_code.hpp>
27 #include "include/neorados/RADOS.hpp"
29 #include "include/buffer.h"
31 #include "common/random_string.h"
33 #include "cls/fifo/cls_fifo_types.h"
34 #include "cls/fifo/cls_fifo_ops.h"
40 namespace neorados::cls::fifo
{
41 namespace bs
= boost::system
;
42 namespace cb
= ceph::buffer
;
43 namespace fifo
= rados::cls::fifo
;
45 void create_meta(WriteOp
& op
, std::string_view id
,
46 std::optional
<fifo::objv
> objv
,
47 std::optional
<std::string_view
> oid_prefix
,
49 std::uint64_t max_part_size
,
50 std::uint64_t max_entry_size
)
52 fifo::op::create_meta cm
;
56 cm
.oid_prefix
= oid_prefix
;
57 cm
.max_part_size
= max_part_size
;
58 cm
.max_entry_size
= max_entry_size
;
59 cm
.exclusive
= exclusive
;
63 op
.exec(fifo::op::CLASS
, fifo::op::CREATE_META
, in
);
66 void get_meta(ReadOp
& op
, std::optional
<fifo::objv
> objv
,
67 bs::error_code
* ec_out
, fifo::info
* info
,
68 std::uint32_t* part_header_size
,
69 std::uint32_t* part_entry_overhead
)
71 fifo::op::get_meta gm
;
75 op
.exec(fifo::op::CLASS
, fifo::op::GET_META
, in
,
76 [ec_out
, info
, part_header_size
,
77 part_entry_overhead
](bs::error_code ec
, const cb::list
& bl
) {
78 fifo::op::get_meta_reply reply
;
80 auto iter
= bl
.cbegin();
82 } catch (const cb::error
& err
) {
85 if (ec_out
) *ec_out
= ec
;
86 if (info
) *info
= std::move(reply
.info
);
87 if (part_header_size
) *part_header_size
= reply
.part_header_size
;
88 if (part_entry_overhead
)
89 *part_entry_overhead
= reply
.part_entry_overhead
;
93 void update_meta(WriteOp
& op
, const fifo::objv
& objv
,
94 const fifo::update
& update
)
96 fifo::op::update_meta um
;
99 um
.tail_part_num
= update
.tail_part_num();
100 um
.head_part_num
= update
.head_part_num();
101 um
.min_push_part_num
= update
.min_push_part_num();
102 um
.max_push_part_num
= update
.max_push_part_num();
103 um
.journal_entries_add
= std::move(update
).journal_entries_add();
104 um
.journal_entries_rm
= std::move(update
).journal_entries_rm();
108 op
.exec(fifo::op::CLASS
, fifo::op::UPDATE_META
, in
);
111 void part_init(WriteOp
& op
, std::string_view tag
,
112 fifo::data_params params
)
114 fifo::op::init_part ip
;
121 op
.exec(fifo::op::CLASS
, fifo::op::INIT_PART
, in
);
124 void push_part(WriteOp
& op
, std::string_view tag
,
125 std::deque
<cb::list
> data_bufs
,
126 fu2::unique_function
<void(bs::error_code
, int)> f
)
128 fifo::op::push_part pp
;
131 pp
.data_bufs
= data_bufs
;
134 for (const auto& bl
: data_bufs
)
135 pp
.total_len
+= bl
.length();
139 op
.exec(fifo::op::CLASS
, fifo::op::PUSH_PART
, in
,
140 [f
= std::move(f
)](bs::error_code ec
, int r
, const cb::list
&) mutable {
146 void trim_part(WriteOp
& op
,
147 std::optional
<std::string_view
> tag
,
148 std::uint64_t ofs
, bool exclusive
)
150 fifo::op::trim_part tp
;
154 tp
.exclusive
= exclusive
;
158 op
.exec(fifo::op::CLASS
, fifo::op::TRIM_PART
, in
);
161 void list_part(ReadOp
& op
,
162 std::optional
<string_view
> tag
,
164 std::uint64_t max_entries
,
165 bs::error_code
* ec_out
,
166 std::vector
<fifo::part_list_entry
>* entries
,
171 fifo::op::list_part lp
;
175 lp
.max_entries
= max_entries
;
179 op
.exec(fifo::op::CLASS
, fifo::op::LIST_PART
, in
,
180 [entries
, more
, full_part
, ptag
, ec_out
](bs::error_code ec
,
181 const cb::list
& bl
) {
183 if (ec_out
) *ec_out
= ec
;
187 fifo::op::list_part_reply reply
;
188 auto iter
= bl
.cbegin();
191 } catch (const cb::error
& err
) {
192 if (ec_out
) *ec_out
= ec
;
196 if (entries
) *entries
= std::move(reply
.entries
);
197 if (more
) *more
= reply
.more
;
198 if (full_part
) *full_part
= reply
.full_part
;
199 if (ptag
) *ptag
= reply
.tag
;
203 void get_part_info(ReadOp
& op
,
204 bs::error_code
* out_ec
,
205 fifo::part_header
* header
)
207 fifo::op::get_part_info gpi
;
211 op
.exec(fifo::op::CLASS
, fifo::op::GET_PART_INFO
, in
,
212 [out_ec
, header
](bs::error_code ec
, const cb::list
& bl
) {
214 if (out_ec
) *out_ec
= ec
;
216 fifo::op::get_part_info_reply reply
;
217 auto iter
= bl
.cbegin();
220 } catch (const cb::error
& err
) {
221 if (out_ec
) *out_ec
= ec
;
225 if (header
) *header
= std::move(reply
.header
);
229 std::optional
<marker
> FIFO::to_marker(std::string_view s
) {
232 m
.num
= info
.tail_part_num
;
237 auto pos
= s
.find(':');
238 if (pos
== string::npos
) {
242 auto num
= s
.substr(0, pos
);
243 auto ofs
= s
.substr(pos
+ 1);
245 auto n
= ceph::parse
<decltype(m
.num
)>(num
);
250 auto o
= ceph::parse
<decltype(m
.ofs
)>(ofs
);
258 bs::error_code
FIFO::apply_update(fifo::info
* info
,
259 const fifo::objv
& objv
,
260 const fifo::update
& update
) {
261 std::unique_lock
l(m
);
262 auto err
= info
->apply_update(update
);
263 if (objv
!= info
->version
) {
264 ldout(r
->cct(), 0) << __func__
<< "(): Raced locally!" << dendl
;
268 ldout(r
->cct(), 0) << __func__
<< "(): ERROR: " << err
<< dendl
;
269 return errc::update_failed
;
277 std::string
FIFO::generate_tag() const
279 static constexpr auto HEADER_TAG_SIZE
= 16;
280 return gen_rand_alphanumeric_plain(r
->cct(), HEADER_TAG_SIZE
);
283 #pragma GCC diagnostic push
284 #pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
285 #pragma clang diagnostic push
286 #pragma clang diagnostic ignored "-Wnon-virtual-dtor"
287 class error_category
: public ceph::converting_category
{
290 const char* name() const noexcept override
;
291 const char* message(int ev
, char*, std::size_t) const noexcept override
;
292 std::string
message(int ev
) const override
;
293 bs::error_condition
default_error_condition(int ev
) const noexcept
295 bool equivalent(int ev
, const bs::error_condition
& c
) const
297 using ceph::converting_category::equivalent
;
298 int from_code(int ev
) const noexcept override
;
300 #pragma GCC diagnostic pop
301 #pragma clang diagnostic pop
303 const char* error_category::name() const noexcept
{
307 const char* error_category::message(int ev
, char*, std::size_t) const noexcept
{
311 switch (static_cast<errc
>(ev
)) {
313 return "Retry-race count exceeded";
315 case errc::inconsistency
:
316 return "Inconsistent result! New head before old head";
318 case errc::entry_too_large
:
319 return "Pushed entry too large";
321 case errc::invalid_marker
:
322 return "Invalid marker string";
324 case errc::update_failed
:
325 return "Update failed";
328 return "Unknown error";
331 std::string
error_category::message(int ev
) const {
332 return message(ev
, nullptr, 0);
336 error_category::default_error_condition(int ev
) const noexcept
{
337 switch (static_cast<errc
>(ev
)) {
339 return bs::errc::operation_canceled
;
341 case errc::inconsistency
:
342 return bs::errc::io_error
;
344 case errc::entry_too_large
:
345 return bs::errc::value_too_large
;
347 case errc::invalid_marker
:
348 return bs::errc::invalid_argument
;
350 case errc::update_failed
:
351 return bs::errc::invalid_argument
;
354 return { ev
, *this };
357 bool error_category::equivalent(int ev
, const bs::error_condition
& c
) const noexcept
{
358 return default_error_condition(ev
) == c
;
361 int error_category::from_code(int ev
) const noexcept
{
362 switch (static_cast<errc
>(ev
)) {
366 case errc::inconsistency
:
369 case errc::entry_too_large
:
372 case errc::invalid_marker
:
375 case errc::update_failed
:
382 const bs::error_category
& error_category() noexcept
{
383 static const class error_category c
;