]>
git.proxmox.com Git - ceph.git/blob - ceph/src/cls/fifo/cls_fifo_types.h
8a471828b7a5ac139b8876cebdc53d2aaa5822f0
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2019 Red Hat, Inc.
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
25 #undef FMT_HEADER_ONLY
26 #define FMT_HEADER_ONLY 1
27 #include <fmt/format.h>
29 #include "include/buffer.h"
30 #include "include/encoding.h"
31 #include "include/types.h"
33 #include "common/ceph_time.h"
37 namespace rados::cls::fifo
{
42 void encode(ceph::buffer::list
& bl
) const {
43 ENCODE_START(1, 1, bl
);
48 void decode(ceph::buffer::list::const_iterator
& bl
) {
54 void dump(ceph::Formatter
* f
) const;
55 void decode_json(JSONObj
* obj
);
57 bool operator ==(const objv
& rhs
) const {
58 return (instance
== rhs
.instance
&&
61 bool operator !=(const objv
& rhs
) const {
62 return (instance
!= rhs
.instance
||
65 bool same_or_later(const objv
& rhs
) const {
66 return (instance
== rhs
.instance
||
71 return instance
.empty();
74 std::string
to_str() const {
75 return fmt::format("{}{{{}}}", instance
, ver
);
78 WRITE_CLASS_ENCODER(objv
)
79 inline std::ostream
& operator <<(std::ostream
& os
, const objv
& objv
)
81 return os
<< objv
.to_str();
85 std::uint64_t max_part_size
{0};
86 std::uint64_t max_entry_size
{0};
87 std::uint64_t full_size_threshold
{0};
89 void encode(ceph::buffer::list
& bl
) const {
90 ENCODE_START(1, 1, bl
);
91 encode(max_part_size
, bl
);
92 encode(max_entry_size
, bl
);
93 encode(full_size_threshold
, bl
);
96 void decode(ceph::buffer::list::const_iterator
& bl
) {
98 decode(max_part_size
, bl
);
99 decode(max_entry_size
, bl
);
100 decode(full_size_threshold
, bl
);
103 void dump(ceph::Formatter
* f
) const;
104 void decode_json(JSONObj
* obj
);
106 bool operator ==(const data_params
& rhs
) const {
107 return (max_part_size
== rhs
.max_part_size
&&
108 max_entry_size
== rhs
.max_entry_size
&&
109 full_size_threshold
== rhs
.full_size_threshold
);
112 WRITE_CLASS_ENCODER(data_params
)
113 inline std::ostream
& operator <<(std::ostream
& m
, const data_params
& d
) {
114 return m
<< "max_part_size: " << d
.max_part_size
<< ", "
115 << "max_entry_size: " << d
.max_entry_size
<< ", "
116 << "full_size_threshold: " << d
.full_size_threshold
;
119 struct journal_entry
{
127 std::int64_t part_num
{0};
128 std::string part_tag
;
130 void encode(ceph::buffer::list
& bl
) const {
131 ENCODE_START(1, 1, bl
);
133 encode(part_num
, bl
);
134 encode(part_tag
, bl
);
137 void decode(ceph::buffer::list::const_iterator
& bl
) {
141 op
= static_cast<Op
>(i
);
142 decode(part_num
, bl
);
143 decode(part_tag
, bl
);
146 void dump(ceph::Formatter
* f
) const;
148 bool operator ==(const journal_entry
& e
) {
149 return (op
== e
.op
&&
150 part_num
== e
.part_num
&&
151 part_tag
== e
.part_tag
);
154 WRITE_CLASS_ENCODER(journal_entry
)
155 inline std::ostream
& operator <<(std::ostream
& m
, const journal_entry::Op
& o
) {
157 case journal_entry::Op::unknown
:
158 return m
<< "Op::unknown";
159 case journal_entry::Op::create
:
160 return m
<< "Op::create";
161 case journal_entry::Op::set_head
:
162 return m
<< "Op::set_head";
163 case journal_entry::Op::remove
:
164 return m
<< "Op::remove";
166 return m
<< "Bad value: " << static_cast<int>(o
);
168 inline std::ostream
& operator <<(std::ostream
& m
, const journal_entry
& j
) {
169 return m
<< "op: " << j
.op
<< ", "
170 << "part_num: " << j
.part_num
<< ", "
171 << "part_tag: " << j
.part_tag
;
174 // This is actually a useful builder, since otherwise we end up with
175 // four uint64_ts in a row and only care about a subset at a time.
177 std::optional
<std::uint64_t> tail_part_num_
;
178 std::optional
<std::uint64_t> head_part_num_
;
179 std::optional
<std::uint64_t> min_push_part_num_
;
180 std::optional
<std::uint64_t> max_push_part_num_
;
181 std::vector
<fifo::journal_entry
> journal_entries_add_
;
182 std::vector
<fifo::journal_entry
> journal_entries_rm_
;
186 update
&& tail_part_num(std::optional
<std::uint64_t> num
) noexcept
{
187 tail_part_num_
= num
;
188 return std::move(*this);
190 auto tail_part_num() const noexcept
{
191 return tail_part_num_
;
194 update
&& head_part_num(std::optional
<std::uint64_t> num
) noexcept
{
195 head_part_num_
= num
;
196 return std::move(*this);
198 auto head_part_num() const noexcept
{
199 return head_part_num_
;
202 update
&& min_push_part_num(std::optional
<std::uint64_t> num
)
204 min_push_part_num_
= num
;
205 return std::move(*this);
207 auto min_push_part_num() const noexcept
{
208 return min_push_part_num_
;
211 update
&& max_push_part_num(std::optional
<std::uint64_t> num
) noexcept
{
212 max_push_part_num_
= num
;
213 return std::move(*this);
215 auto max_push_part_num() const noexcept
{
216 return max_push_part_num_
;
219 update
&& journal_entry_add(fifo::journal_entry entry
) {
220 journal_entries_add_
.push_back(std::move(entry
));
221 return std::move(*this);
223 update
&& journal_entries_add(
224 std::optional
<std::vector
<fifo::journal_entry
>>&& entries
) {
226 journal_entries_add_
= std::move(*entries
);
228 journal_entries_add_
.clear();
230 return std::move(*this);
232 const auto& journal_entries_add() const & noexcept
{
233 return journal_entries_add_
;
235 auto&& journal_entries_add() && noexcept
{
236 return std::move(journal_entries_add_
);
239 update
&& journal_entry_rm(fifo::journal_entry entry
) {
240 journal_entries_rm_
.push_back(std::move(entry
));
241 return std::move(*this);
243 update
&& journal_entries_rm(
244 std::optional
<std::vector
<fifo::journal_entry
>>&& entries
) {
246 journal_entries_rm_
= std::move(*entries
);
248 journal_entries_rm_
.clear();
250 return std::move(*this);
252 const auto& journal_entries_rm() const & noexcept
{
253 return journal_entries_rm_
;
255 auto&& journal_entries_rm() && noexcept
{
256 return std::move(journal_entries_rm_
);
258 friend std::ostream
& operator <<(std::ostream
& m
, const update
& u
);
260 inline std::ostream
& operator <<(std::ostream
& m
, const update
& u
) {
262 if (u
.tail_part_num_
) {
263 m
<< "tail_part_num: " << *u
.tail_part_num_
;
266 if (u
.head_part_num_
) {
269 m
<< "head_part_num: " << *u
.head_part_num_
;
272 if (u
.min_push_part_num_
) {
275 m
<< "min_push_part_num: " << *u
.min_push_part_num_
;
278 if (u
.max_push_part_num_
) {
281 m
<< "max_push_part_num: " << *u
.max_push_part_num_
;
284 if (!u
.journal_entries_add_
.empty()) {
287 m
<< "journal_entries_add: {" << u
.journal_entries_add_
<< "}";
290 if (!u
.journal_entries_rm_
.empty()) {
293 m
<< "journal_entries_rm: {" << u
.journal_entries_rm_
<< "}";
304 std::string oid_prefix
;
307 std::int64_t tail_part_num
{0};
308 std::int64_t head_part_num
{-1};
309 std::int64_t min_push_part_num
{0};
310 std::int64_t max_push_part_num
{-1};
312 std::string head_tag
;
313 std::map
<int64_t, std::string
> tags
;
315 std::multimap
<int64_t, journal_entry
> journal
;
317 bool need_new_head() const {
318 return (head_part_num
< min_push_part_num
);
321 bool need_new_part() const {
322 return (max_push_part_num
< min_push_part_num
);
325 void encode(ceph::buffer::list
& bl
) const {
326 ENCODE_START(1, 1, bl
);
329 encode(oid_prefix
, bl
);
331 encode(tail_part_num
, bl
);
332 encode(head_part_num
, bl
);
333 encode(min_push_part_num
, bl
);
334 encode(max_push_part_num
, bl
);
336 encode(head_tag
, bl
);
340 void decode(ceph::buffer::list::const_iterator
& bl
) {
344 decode(oid_prefix
, bl
);
346 decode(tail_part_num
, bl
);
347 decode(head_part_num
, bl
);
348 decode(min_push_part_num
, bl
);
349 decode(max_push_part_num
, bl
);
351 decode(head_tag
, bl
);
355 void dump(ceph::Formatter
* f
) const;
356 void decode_json(JSONObj
* obj
);
358 std::string
part_oid(std::int64_t part_num
) const {
359 return fmt::format("{}.{}", oid_prefix
, part_num
);
362 journal_entry
next_journal_entry(std::string tag
) const {
364 entry
.op
= journal_entry::Op::create
;
365 entry
.part_num
= max_push_part_num
+ 1;
366 entry
.part_tag
= std::move(tag
);
370 std::optional
<std::string
>
371 apply_update(const update
& update
) {
372 if (update
.tail_part_num()) {
373 tail_part_num
= *update
.tail_part_num();
376 if (update
.min_push_part_num()) {
377 min_push_part_num
= *update
.min_push_part_num();
380 if (update
.max_push_part_num()) {
381 max_push_part_num
= *update
.max_push_part_num();
384 for (const auto& entry
: update
.journal_entries_add()) {
385 auto iter
= journal
.find(entry
.part_num
);
386 if (iter
!= journal
.end() &&
387 iter
->second
.op
== entry
.op
) {
388 /* don't allow multiple concurrent (same) operations on the same part,
389 racing clients should use objv to avoid races anyway */
390 return fmt::format("multiple concurrent operations on same part are not "
391 "allowed, part num={}", entry
.part_num
);
394 if (entry
.op
== journal_entry::Op::create
) {
395 tags
[entry
.part_num
] = entry
.part_tag
;
398 journal
.emplace(entry
.part_num
, entry
);
401 for (const auto& entry
: update
.journal_entries_rm()) {
402 journal
.erase(entry
.part_num
);
405 if (update
.head_part_num()) {
406 tags
.erase(head_part_num
);
407 head_part_num
= *update
.head_part_num();
408 auto iter
= tags
.find(head_part_num
);
409 if (iter
!= tags
.end()) {
410 head_tag
= iter
->second
;
419 WRITE_CLASS_ENCODER(info
)
420 inline std::ostream
& operator <<(std::ostream
& m
, const info
& i
) {
421 return m
<< "id: " << i
.id
<< ", "
422 << "version: " << i
.version
<< ", "
423 << "oid_prefix: " << i
.oid_prefix
<< ", "
424 << "params: {" << i
.params
<< "}, "
425 << "tail_part_num: " << i
.tail_part_num
<< ", "
426 << "head_part_num: " << i
.head_part_num
<< ", "
427 << "min_push_part_num: " << i
.min_push_part_num
<< ", "
428 << "max_push_part_num: " << i
.max_push_part_num
<< ", "
429 << "head_tag: " << i
.head_tag
<< ", "
430 << "tags: {" << i
.tags
<< "}, "
431 << "journal: {" << i
.journal
;
434 struct part_list_entry
{
435 ceph::buffer::list data
;
436 std::uint64_t ofs
= 0;
437 ceph::real_time mtime
;
440 part_list_entry(ceph::buffer::list
&& data
,
442 ceph::real_time mtime
)
443 : data(std::move(data
)), ofs(ofs
), mtime(mtime
) {}
446 void encode(ceph::buffer::list
& bl
) const {
447 ENCODE_START(1, 1, bl
);
453 void decode(ceph::buffer::list::const_iterator
& bl
) {
461 WRITE_CLASS_ENCODER(part_list_entry
)
462 inline std::ostream
& operator <<(std::ostream
& m
,
463 const part_list_entry
& p
) {
464 using ceph::operator <<;
465 return m
<< "data: " << p
.data
<< ", "
466 << "ofs: " << p
.ofs
<< ", "
467 << "mtime: " << p
.mtime
;
475 std::uint64_t magic
{0};
477 std::uint64_t min_ofs
{0};
478 std::uint64_t last_ofs
{0};
479 std::uint64_t next_ofs
{0};
480 std::uint64_t min_index
{0};
481 std::uint64_t max_index
{0};
482 ceph::real_time max_time
;
484 void encode(ceph::buffer::list
& bl
) const {
485 ENCODE_START(1, 1, bl
);
490 encode(last_ofs
, bl
);
491 encode(next_ofs
, bl
);
492 encode(min_index
, bl
);
493 encode(max_index
, bl
);
494 encode(max_time
, bl
);
497 void decode(ceph::buffer::list::const_iterator
& bl
) {
503 decode(last_ofs
, bl
);
504 decode(next_ofs
, bl
);
505 decode(min_index
, bl
);
506 decode(max_index
, bl
);
507 decode(max_time
, bl
);
511 WRITE_CLASS_ENCODER(part_header
)
512 inline std::ostream
& operator <<(std::ostream
& m
, const part_header
& p
) {
513 using ceph::operator <<;
514 return m
<< "tag: " << p
.tag
<< ", "
515 << "params: {" << p
.params
<< "}, "
516 << "magic: " << p
.magic
<< ", "
517 << "min_ofs: " << p
.min_ofs
<< ", "
518 << "last_ofs: " << p
.last_ofs
<< ", "
519 << "next_ofs: " << p
.next_ofs
<< ", "
520 << "min_index: " << p
.min_index
<< ", "
521 << "max_index: " << p
.max_index
<< ", "
522 << "max_time: " << p
.max_time
;
524 } // namespace rados::cls::fifo