1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 * This is an OSD class that implements methods for management
15 #undef FMT_HEADER_ONLY
16 #define FMT_HEADER_ONLY 1
17 #include <fmt/format.h>
19 #include "include/buffer.h"
20 #include "include/types.h"
22 #include "objclass/objclass.h"
24 #include "cls/fifo/cls_fifo_ops.h"
25 #include "cls/fifo/cls_fifo_types.h"
30 namespace rados::cls::fifo
{
32 static constexpr auto CLS_FIFO_MAX_PART_HEADER_SIZE
= 512;
34 static std::uint32_t part_entry_overhead
;
36 struct entry_header_pre
{
39 ceph_le64 header_size
;
43 } __attribute__ ((packed
));
46 ceph::real_time mtime
;
48 void encode(ceph::buffer::list
& bl
) const {
49 ENCODE_START(1, 1, bl
);
53 void decode(ceph::buffer::list::const_iterator
& bl
) {
59 WRITE_CLASS_ENCODER(entry_header
)
63 std::string
new_oid_prefix(std::string id
, std::optional
<std::string
>& val
)
65 static constexpr auto PREFIX_RND_SIZE
= 12;
70 char buf
[PREFIX_RND_SIZE
+ 1];
71 buf
[PREFIX_RND_SIZE
] = 0;
73 cls_gen_rand_base64(buf
, sizeof(buf
) - 1);
75 return fmt::format("{}.{}", id
, buf
);
78 int write_header(cls_method_context_t hctx
,
82 static constexpr auto HEADER_INSTANCE_SIZE
= 16;
83 if (header
.version
.instance
.empty()) {
84 char buf
[HEADER_INSTANCE_SIZE
+ 1];
85 buf
[HEADER_INSTANCE_SIZE
] = 0;
86 cls_gen_rand_base64(buf
, sizeof(buf
) - 1);
87 header
.version
.instance
= buf
;
92 ceph::buffer::list bl
;
94 return cls_cxx_write_full(hctx
, &bl
);
97 int read_part_header(cls_method_context_t hctx
,
98 part_header
* part_header
)
100 ceph::buffer::list bl
;
101 int r
= cls_cxx_read2(hctx
, 0, CLS_FIFO_MAX_PART_HEADER_SIZE
, &bl
,
102 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED
);
104 CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__
, r
);
108 auto iter
= bl
.cbegin();
110 decode(*part_header
, iter
);
111 } catch (const ceph::buffer::error
& err
) {
112 CLS_ERR("ERROR: %s: failed decoding part header", __PRETTY_FUNCTION__
);
116 using ceph::operator <<;
117 std::ostringstream ss
;
118 ss
<< part_header
->max_time
;
119 CLS_LOG(5, "%s:%d read part_header:\n"
121 "\tmagic=0x%" PRIx64
"\n"
122 "\tmin_ofs=%" PRId64
"\n"
123 "\tlast_ofs=%" PRId64
"\n"
124 "\tnext_ofs=%" PRId64
"\n"
125 "\tmin_index=%" PRId64
"\n"
126 "\tmax_index=%" PRId64
"\n"
128 __PRETTY_FUNCTION__
, __LINE__
,
129 part_header
->tag
.c_str(),
131 part_header
->min_ofs
,
132 part_header
->last_ofs
,
133 part_header
->next_ofs
,
134 part_header
->min_index
,
135 part_header
->max_index
,
141 int write_part_header(cls_method_context_t hctx
,
142 part_header
& part_header
)
144 ceph::buffer::list bl
;
145 encode(part_header
, bl
);
147 if (bl
.length() > CLS_FIFO_MAX_PART_HEADER_SIZE
) {
148 CLS_ERR("%s: cannot write part header, buffer exceeds max size", __PRETTY_FUNCTION__
);
152 int r
= cls_cxx_write2(hctx
, 0, bl
.length(),
153 &bl
, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED
);
155 CLS_ERR("%s: failed to write part header: r=%d",
156 __PRETTY_FUNCTION__
, r
);
163 int read_header(cls_method_context_t hctx
,
164 std::optional
<objv
> objv
,
165 info
* info
, bool get_info
= false)
169 int r
= cls_cxx_stat2(hctx
, &size
, nullptr);
171 CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__
, r
);
175 ceph::buffer::list bl
;
176 r
= cls_cxx_read2(hctx
, 0, size
, &bl
, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED
);
178 CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__
, r
);
184 CLS_LOG(5, "%s: Zero length object, likely probe, returning ENODATA", __PRETTY_FUNCTION__
);
186 CLS_ERR("ERROR: %s: Zero length object, returning ENODATA", __PRETTY_FUNCTION__
);
192 auto iter
= bl
.cbegin();
194 } catch (const ceph::buffer::error
& err
) {
195 CLS_ERR("ERROR: %s: failed decoding header", __PRETTY_FUNCTION__
);
199 if (objv
&& !(info
->version
== *objv
)) {
200 auto s1
= info
->version
.to_str();
201 auto s2
= objv
->to_str();
202 CLS_ERR("%s: version mismatch (header=%s, req=%s), canceled operation",
203 __PRETTY_FUNCTION__
, s1
.c_str(), s2
.c_str());
210 int create_meta(cls_method_context_t hctx
,
211 ceph::buffer::list
* in
, ceph::buffer::list
* out
)
213 CLS_LOG(5, "%s", __PRETTY_FUNCTION__
);
217 auto iter
= in
->cbegin();
219 } catch (const ceph::buffer::error
& err
) {
220 CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__
,
226 CLS_ERR("%s: ID cannot be empty", __PRETTY_FUNCTION__
);
230 if (op
.max_part_size
== 0 ||
231 op
.max_entry_size
== 0 ||
232 op
.max_entry_size
> op
.max_part_size
) {
233 CLS_ERR("ERROR: %s: invalid dimensions.", __PRETTY_FUNCTION__
);
239 int r
= cls_cxx_stat2(hctx
, &size
, nullptr);
240 if (r
< 0 && r
!= -ENOENT
) {
241 CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d",
242 __PRETTY_FUNCTION__
, r
);
245 if (op
.exclusive
&& r
== 0) {
246 CLS_ERR("%s: exclusive create but queue already exists",
247 __PRETTY_FUNCTION__
);
252 CLS_LOG(5, "%s: FIFO already exists, reading from disk and comparing.",
253 __PRETTY_FUNCTION__
);
254 ceph::buffer::list bl
;
255 r
= cls_cxx_read2(hctx
, 0, size
, &bl
, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED
);
257 CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d",
258 __PRETTY_FUNCTION__
, r
);
264 auto iter
= bl
.cbegin();
265 decode(header
, iter
);
266 } catch (const ceph::buffer::error
& err
) {
267 CLS_ERR("ERROR: %s: failed decoding header: %s",
268 __PRETTY_FUNCTION__
, err
.what());
272 if (!(header
.id
== op
.id
&&
274 header
.oid_prefix
== *op
.oid_prefix
) &&
276 header
.version
== *op
.version
))) {
277 CLS_ERR("%s: failed to re-create existing queue "
278 "with different params", __PRETTY_FUNCTION__
);
282 return 0; /* already exists */
288 header
.version
= *op
.version
;
290 static constexpr auto DEFAULT_INSTANCE_SIZE
= 16;
291 char buf
[DEFAULT_INSTANCE_SIZE
+ 1];
292 cls_gen_rand_base64(buf
, sizeof(buf
));
293 buf
[DEFAULT_INSTANCE_SIZE
] = '\0';
294 header
.version
.instance
= buf
;
295 header
.version
.ver
= 1;
297 header
.oid_prefix
= new_oid_prefix(op
.id
, op
.oid_prefix
);
299 header
.params
.max_part_size
= op
.max_part_size
;
300 header
.params
.max_entry_size
= op
.max_entry_size
;
301 header
.params
.full_size_threshold
= op
.max_part_size
- op
.max_entry_size
- part_entry_overhead
;
303 r
= write_header(hctx
, header
, false);
305 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__
, r
);
312 int update_meta(cls_method_context_t hctx
, ceph::buffer::list
* in
,
313 ceph::buffer::list
* out
)
315 CLS_LOG(5, "%s", __PRETTY_FUNCTION__
);
319 auto iter
= in
->cbegin();
321 } catch (const ceph::buffer::error
& err
) {
322 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__
);
326 if (op
.version
.empty()) {
327 CLS_ERR("%s: no version supplied", __PRETTY_FUNCTION__
);
333 int r
= read_header(hctx
, op
.version
, &header
);
338 auto u
= fifo::update().tail_part_num(op
.tail_part_num
)
339 .head_part_num(op
.head_part_num
)
340 .min_push_part_num(op
.min_push_part_num
)
341 .max_push_part_num(op
.max_push_part_num
)
342 .journal_entries_add(
343 std::move(op
.journal_entries_add
))
345 std::move(op
.journal_entries_rm
));
347 auto err
= header
.apply_update(u
);
349 std::ostringstream ss
;
351 CLS_ERR("%s: %s: %s", __PRETTY_FUNCTION__
, err
->c_str(),
356 r
= write_header(hctx
, header
);
358 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__
, r
);
365 int get_meta(cls_method_context_t hctx
, ceph::buffer::list
* in
,
366 ceph::buffer::list
* out
)
368 CLS_LOG(5, "%s", __PRETTY_FUNCTION__
);
372 auto iter
= in
->cbegin();
374 } catch (const ceph::buffer::error
&err
) {
375 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__
);
379 op::get_meta_reply reply
;
380 int r
= read_header(hctx
, op
.version
, &reply
.info
, true);
385 reply
.part_header_size
= CLS_FIFO_MAX_PART_HEADER_SIZE
;
386 reply
.part_entry_overhead
= part_entry_overhead
;
393 int init_part(cls_method_context_t hctx
, ceph::buffer::list
* in
,
394 ceph::buffer::list
*out
)
396 CLS_LOG(5, "%s", __PRETTY_FUNCTION__
);
400 auto iter
= in
->cbegin();
402 } catch (const ceph::buffer::error
&err
) {
403 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__
);
409 if (op
.tag
.empty()) {
410 CLS_ERR("%s: tag required", __PRETTY_FUNCTION__
);
414 int r
= cls_cxx_stat2(hctx
, &size
, nullptr);
415 if (r
< 0 && r
!= -ENOENT
) {
416 CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__
, r
);
419 if (r
== 0 && size
> 0) {
420 part_header part_header
;
421 r
= read_part_header(hctx
, &part_header
);
423 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__
);
427 if (!(part_header
.tag
== op
.tag
&&
428 part_header
.params
== op
.params
)) {
429 CLS_ERR("%s: failed to re-create existing part with different "
430 "params", __PRETTY_FUNCTION__
);
434 return 0; /* already exists */
437 part_header part_header
;
439 part_header
.tag
= op
.tag
;
440 part_header
.params
= op
.params
;
442 part_header
.min_ofs
= CLS_FIFO_MAX_PART_HEADER_SIZE
;
443 part_header
.last_ofs
= 0;
444 part_header
.next_ofs
= part_header
.min_ofs
;
445 part_header
.max_time
= ceph::real_clock::now();
447 cls_gen_random_bytes(reinterpret_cast<char *>(&part_header
.magic
),
448 sizeof(part_header
.magic
));
450 r
= write_part_header(hctx
, part_header
);
452 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__
, r
);
459 bool full_part(const part_header
& part_header
)
461 return (part_header
.next_ofs
> part_header
.params
.full_size_threshold
);
464 int push_part(cls_method_context_t hctx
, ceph::buffer::list
* in
,
465 ceph::buffer::list
* out
)
467 CLS_LOG(5, "%s", __PRETTY_FUNCTION__
);
471 auto iter
= in
->cbegin();
473 } catch (const ceph::buffer::error
& err
) {
474 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__
);
478 if (op
.tag
.empty()) {
479 CLS_ERR("%s: tag required", __PRETTY_FUNCTION__
);
483 part_header part_header
;
484 int r
= read_part_header(hctx
, &part_header
);
486 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__
);
490 if (!(part_header
.tag
== op
.tag
)) {
491 CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__
);
495 std::uint64_t effective_len
= op
.total_len
+ op
.data_bufs
.size() *
498 if (effective_len
> part_header
.params
.max_part_size
) {
502 if (full_part(part_header
)) {
506 auto now
= ceph::real_clock::now();
507 struct entry_header entry_header
= { now
};
508 ceph::buffer::list entry_header_bl
;
509 encode(entry_header
, entry_header_bl
);
511 auto max_index
= part_header
.max_index
;
512 const auto write_ofs
= part_header
.next_ofs
;
513 auto ofs
= part_header
.next_ofs
;
515 entry_header_pre pre_header
;
516 pre_header
.magic
= part_header
.magic
;
517 pre_header
.pre_size
= sizeof(pre_header
);
518 pre_header
.reserved
= 0;
520 std::uint64_t total_data
= 0;
521 for (auto& data
: op
.data_bufs
) {
522 total_data
+= data
.length();
524 if (total_data
!= op
.total_len
) {
525 CLS_ERR("%s: length mismatch: op.total_len=%" PRId64
526 " total data received=%" PRId64
,
527 __PRETTY_FUNCTION__
, op
.total_len
, total_data
);
532 int entries_pushed
= 0;
533 ceph::buffer::list all_data
;
534 for (auto& data
: op
.data_bufs
) {
535 if (full_part(part_header
))
538 pre_header
.header_size
= entry_header_bl
.length();
539 pre_header
.data_size
= data
.length();
540 pre_header
.index
= max_index
;
542 bufferptr
pre(reinterpret_cast<char*>(&pre_header
), sizeof(pre_header
));
543 auto entry_write_len
= pre
.length() + entry_header_bl
.length() + data
.length();
544 all_data
.append(pre
);
545 all_data
.append(entry_header_bl
);
546 all_data
.claim_append(data
);
548 part_header
.last_ofs
= ofs
;
549 ofs
+= entry_write_len
;
552 part_header
.max_index
= max_index
;
553 part_header
.next_ofs
= ofs
;
555 part_header
.max_time
= now
;
557 auto write_len
= all_data
.length();
559 r
= cls_cxx_write2(hctx
, write_ofs
, write_len
,
560 &all_data
, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED
);
563 CLS_ERR("%s: failed to write entries (ofs=%" PRIu64
564 " len=%u): r=%d", __PRETTY_FUNCTION__
, write_ofs
,
570 r
= write_part_header(hctx
, part_header
);
572 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__
, r
);
576 if (entries_pushed
== 0) {
577 CLS_ERR("%s: pushed no entries? Can't happen!", __PRETTY_FUNCTION__
);
581 return entries_pushed
;
585 static constexpr std::uint64_t prefetch_len
= (128 * 1024);
587 cls_method_context_t hctx
;
589 const fifo::part_header
& part_header
;
592 ceph::buffer::list data
;
594 int fetch(std::uint64_t num_bytes
);
595 int read(std::uint64_t num_bytes
, ceph::buffer::list
* pbl
);
596 int peek(std::uint64_t num_bytes
, char *dest
);
597 int seek(std::uint64_t num_bytes
);
600 EntryReader(cls_method_context_t hctx
,
601 const fifo::part_header
& part_header
,
602 uint64_t ofs
) : hctx(hctx
),
603 part_header(part_header
),
604 ofs(ofs
< part_header
.min_ofs
?
605 part_header
.min_ofs
:
608 std::uint64_t get_ofs() const {
613 return (ofs
>= part_header
.next_ofs
);
616 int peek_pre_header(entry_header_pre
* pre_header
);
617 int get_next_entry(ceph::buffer::list
* pbl
,
619 ceph::real_time
* pmtime
);
623 int EntryReader::fetch(std::uint64_t num_bytes
)
625 CLS_LOG(5, "%s: fetch %d bytes, ofs=%d data.length()=%d", __PRETTY_FUNCTION__
, (int)num_bytes
, (int)ofs
, (int)data
.length());
626 if (data
.length() < num_bytes
) {
627 ceph::buffer::list bl
;
628 CLS_LOG(5, "%s: reading % " PRId64
" bytes at ofs=%" PRId64
, __PRETTY_FUNCTION__
,
629 prefetch_len
, ofs
+ data
.length());
630 int r
= cls_cxx_read2(hctx
, ofs
+ data
.length(), prefetch_len
, &bl
, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED
);
632 CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__
, r
);
635 data
.claim_append(bl
);
638 if (static_cast<unsigned>(num_bytes
) > data
.length()) {
639 CLS_ERR("%s: requested %" PRId64
" bytes, but only "
640 "%u were available", __PRETTY_FUNCTION__
, num_bytes
, data
.length());
647 int EntryReader::read(std::uint64_t num_bytes
, ceph::buffer::list
* pbl
)
649 int r
= fetch(num_bytes
);
653 data
.splice(0, num_bytes
, pbl
);
660 int EntryReader::peek(std::uint64_t num_bytes
, char* dest
)
662 int r
= fetch(num_bytes
);
667 data
.begin().copy(num_bytes
, dest
);
672 int EntryReader::seek(std::uint64_t num_bytes
)
674 ceph::buffer::list bl
;
676 CLS_LOG(5, "%s:%d: num_bytes=%" PRIu64
, __PRETTY_FUNCTION__
, __LINE__
, num_bytes
);
677 return read(num_bytes
, &bl
);
680 int EntryReader::peek_pre_header(entry_header_pre
* pre_header
)
686 int r
= peek(sizeof(*pre_header
),
687 reinterpret_cast<char*>(pre_header
));
689 CLS_ERR("ERROR: %s: peek() size=%zu failed: r=%d", __PRETTY_FUNCTION__
,
690 sizeof(pre_header
), r
);
694 if (pre_header
->magic
!= part_header
.magic
) {
695 CLS_ERR("ERROR: %s: unexpected pre_header magic", __PRETTY_FUNCTION__
);
703 int EntryReader::get_next_entry(ceph::buffer::list
* pbl
,
705 ceph::real_time
* pmtime
)
707 entry_header_pre pre_header
;
708 int r
= peek_pre_header(&pre_header
);
710 CLS_ERR("ERROR: %s: peek_pre_header() failed: r=%d", __PRETTY_FUNCTION__
, r
);
718 CLS_LOG(5, "%s:%d: pre_header.pre_size=%" PRIu64
, __PRETTY_FUNCTION__
, __LINE__
,
719 uint64_t(pre_header
.pre_size
));
720 r
= seek(pre_header
.pre_size
);
722 CLS_ERR("ERROR: %s: failed to seek: r=%d", __PRETTY_FUNCTION__
, r
);
726 ceph::buffer::list header
;
727 CLS_LOG(5, "%s:%d: pre_header.header_size=%d", __PRETTY_FUNCTION__
, __LINE__
, (int)pre_header
.header_size
);
728 r
= read(pre_header
.header_size
, &header
);
730 CLS_ERR("ERROR: %s: failed to read entry header: r=%d", __PRETTY_FUNCTION__
, r
);
734 entry_header entry_header
;
735 auto iter
= header
.cbegin();
737 decode(entry_header
, iter
);
738 } catch (ceph::buffer::error
& err
) {
739 CLS_ERR("%s: failed decoding entry header", __PRETTY_FUNCTION__
);
744 *pmtime
= entry_header
.mtime
;
748 r
= read(pre_header
.data_size
, pbl
);
750 CLS_ERR("%s: failed reading data: r=%d", __PRETTY_FUNCTION__
, r
);
754 r
= seek(pre_header
.data_size
);
756 CLS_ERR("ERROR: %s: failed to seek: r=%d", __PRETTY_FUNCTION__
, r
);
764 int trim_part(cls_method_context_t hctx
,
765 ceph::buffer::list
*in
, ceph::buffer::list
*out
)
767 CLS_LOG(5, "%s", __PRETTY_FUNCTION__
);
771 auto iter
= in
->cbegin();
773 } catch (const ceph::buffer::error
&err
) {
774 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__
);
778 part_header part_header
;
779 int r
= read_part_header(hctx
, &part_header
);
781 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__
);
786 !(part_header
.tag
== *op
.tag
)) {
787 CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__
);
791 if (op
.ofs
< part_header
.min_ofs
) {
794 if (op
.exclusive
&& op
.ofs
== part_header
.min_ofs
) {
798 if (op
.ofs
>= part_header
.next_ofs
) {
799 if (full_part(part_header
)) {
801 * trim full part completely: remove object
804 r
= cls_cxx_remove(hctx
);
806 CLS_ERR("%s: ERROR: cls_cxx_remove() returned r=%d", __PRETTY_FUNCTION__
, r
);
813 part_header
.min_ofs
= part_header
.next_ofs
;
814 part_header
.min_index
= part_header
.max_index
;
816 EntryReader
reader(hctx
, part_header
, op
.ofs
);
818 entry_header_pre pre_header
;
819 int r
= reader
.peek_pre_header(&pre_header
);
825 part_header
.min_index
= pre_header
.index
;
827 r
= reader
.get_next_entry(nullptr, nullptr, nullptr);
829 CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d",
830 __PRETTY_FUNCTION__
, r
);
833 part_header
.min_index
= pre_header
.index
+ 1;
836 part_header
.min_ofs
= reader
.get_ofs();
839 r
= write_part_header(hctx
, part_header
);
841 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__
, r
);
848 int list_part(cls_method_context_t hctx
, ceph::buffer::list
* in
,
849 ceph::buffer::list
* out
)
851 CLS_LOG(5, "%s", __PRETTY_FUNCTION__
);
855 auto iter
= in
->cbegin();
857 } catch (const buffer::error
&err
) {
858 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__
);
862 part_header part_header
;
863 int r
= read_part_header(hctx
, &part_header
);
865 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__
);
870 !(part_header
.tag
== *op
.tag
)) {
871 CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__
);
875 EntryReader
reader(hctx
, part_header
, op
.ofs
);
877 if (op
.ofs
>= part_header
.min_ofs
&&
879 r
= reader
.get_next_entry(nullptr, nullptr, nullptr);
881 CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d", __PRETTY_FUNCTION__
, r
);
886 op::list_part_reply reply
;
888 reply
.tag
= part_header
.tag
;
890 auto max_entries
= std::min(op
.max_entries
, op::MAX_LIST_ENTRIES
);
892 for (int i
= 0; i
< max_entries
&& !reader
.end(); ++i
) {
893 ceph::buffer::list data
;
894 ceph::real_time mtime
;
897 r
= reader
.get_next_entry(&data
, &ofs
, &mtime
);
899 CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d",
900 __PRETTY_FUNCTION__
, r
);
904 reply
.entries
.emplace_back(std::move(data
), ofs
, mtime
);
907 reply
.more
= !reader
.end();
908 reply
.full_part
= full_part(part_header
);
915 int get_part_info(cls_method_context_t hctx
, ceph::buffer::list
*in
,
916 ceph::buffer::list
*out
)
918 CLS_LOG(5, "%s", __PRETTY_FUNCTION__
);
920 op::get_part_info op
;
922 auto iter
= in
->cbegin();
924 } catch (const ceph::buffer::error
&err
) {
925 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__
);
929 op::get_part_info_reply reply
;
931 int r
= read_part_header(hctx
, &reply
.header
);
933 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__
);
942 } // namespace rados::cls::fifo
946 using namespace rados::cls::fifo
;
947 CLS_LOG(10, "Loaded fifo class!");
949 cls_handle_t h_class
;
950 cls_method_handle_t h_create_meta
;
951 cls_method_handle_t h_get_meta
;
952 cls_method_handle_t h_update_meta
;
953 cls_method_handle_t h_init_part
;
954 cls_method_handle_t h_push_part
;
955 cls_method_handle_t h_trim_part
;
956 cls_method_handle_t h_list_part
;
957 cls_method_handle_t h_get_part_info
;
959 cls_register(op::CLASS
, &h_class
);
960 cls_register_cxx_method(h_class
, op::CREATE_META
,
961 CLS_METHOD_RD
| CLS_METHOD_WR
,
962 create_meta
, &h_create_meta
);
964 cls_register_cxx_method(h_class
, op::GET_META
,
966 get_meta
, &h_get_meta
);
968 cls_register_cxx_method(h_class
, op::UPDATE_META
,
969 CLS_METHOD_RD
| CLS_METHOD_WR
,
970 update_meta
, &h_update_meta
);
972 cls_register_cxx_method(h_class
, op::INIT_PART
,
973 CLS_METHOD_RD
| CLS_METHOD_WR
,
974 init_part
, &h_init_part
);
976 cls_register_cxx_method(h_class
, op::PUSH_PART
,
977 CLS_METHOD_RD
| CLS_METHOD_WR
,
978 push_part
, &h_push_part
);
980 cls_register_cxx_method(h_class
, op::TRIM_PART
,
981 CLS_METHOD_RD
| CLS_METHOD_WR
,
982 trim_part
, &h_trim_part
);
984 cls_register_cxx_method(h_class
, op::LIST_PART
,
986 list_part
, &h_list_part
);
988 cls_register_cxx_method(h_class
, op::GET_PART_INFO
,
990 get_part_info
, &h_get_part_info
);
992 /* calculate entry overhead */
993 struct entry_header entry_header
;
994 ceph::buffer::list entry_header_bl
;
995 encode(entry_header
, entry_header_bl
);
997 part_entry_overhead
= sizeof(entry_header_pre
) + entry_header_bl
.length();