1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 * This is an OSD class that implements methods for management
15 #undef FMT_HEADER_ONLY
16 #define FMT_HEADER_ONLY 1
17 #include <fmt/format.h>
19 #include "include/buffer.h"
20 #include "include/types.h"
22 #include "objclass/objclass.h"
24 #include "cls/fifo/cls_fifo_ops.h"
25 #include "cls/fifo/cls_fifo_types.h"
30 namespace rados::cls::fifo
{
32 static constexpr auto CLS_FIFO_MAX_PART_HEADER_SIZE
= 512;
34 static std::uint32_t part_entry_overhead
;
36 struct entry_header_pre
{
39 ceph_le64 header_size
;
43 } __attribute__ ((packed
));
46 ceph::real_time mtime
;
48 void encode(ceph::buffer::list
& bl
) const {
49 ENCODE_START(1, 1, bl
);
53 void decode(ceph::buffer::list::const_iterator
& bl
) {
59 WRITE_CLASS_ENCODER(entry_header
)
63 std::string
new_oid_prefix(std::string id
, std::optional
<std::string
>& val
)
65 static constexpr auto PREFIX_RND_SIZE
= 12;
70 char buf
[PREFIX_RND_SIZE
+ 1];
71 buf
[PREFIX_RND_SIZE
] = 0;
73 cls_gen_rand_base64(buf
, sizeof(buf
) - 1);
75 return fmt::format("{}.{}", id
, buf
);
78 int write_header(cls_method_context_t hctx
,
82 static constexpr auto HEADER_INSTANCE_SIZE
= 16;
83 if (header
.version
.instance
.empty()) {
84 char buf
[HEADER_INSTANCE_SIZE
+ 1];
85 buf
[HEADER_INSTANCE_SIZE
] = 0;
86 cls_gen_rand_base64(buf
, sizeof(buf
) - 1);
87 header
.version
.instance
= buf
;
92 ceph::buffer::list bl
;
94 return cls_cxx_write_full(hctx
, &bl
);
97 int read_part_header(cls_method_context_t hctx
,
98 part_header
* part_header
)
100 ceph::buffer::list bl
;
101 int r
= cls_cxx_read2(hctx
, 0, CLS_FIFO_MAX_PART_HEADER_SIZE
, &bl
,
102 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED
);
104 CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__
, r
);
108 auto iter
= bl
.cbegin();
110 decode(*part_header
, iter
);
111 } catch (const ceph::buffer::error
& err
) {
112 CLS_ERR("ERROR: %s: failed decoding part header", __PRETTY_FUNCTION__
);
116 using ceph::operator <<;
117 std::ostringstream ss
;
118 ss
<< part_header
->max_time
;
119 CLS_LOG(5, "%s:%d read part_header:\n"
121 "\tmagic=0x%" PRIx64
"\n"
122 "\tmin_ofs=%" PRId64
"\n"
123 "\tlast_ofs=%" PRId64
"\n"
124 "\tnext_ofs=%" PRId64
"\n"
125 "\tmin_index=%" PRId64
"\n"
126 "\tmax_index=%" PRId64
"\n"
128 __PRETTY_FUNCTION__
, __LINE__
,
129 part_header
->tag
.c_str(),
131 part_header
->min_ofs
,
132 part_header
->last_ofs
,
133 part_header
->next_ofs
,
134 part_header
->min_index
,
135 part_header
->max_index
,
141 int write_part_header(cls_method_context_t hctx
,
142 part_header
& part_header
)
144 ceph::buffer::list bl
;
145 encode(part_header
, bl
);
147 if (bl
.length() > CLS_FIFO_MAX_PART_HEADER_SIZE
) {
148 CLS_ERR("%s: cannot write part header, buffer exceeds max size", __PRETTY_FUNCTION__
);
152 int r
= cls_cxx_write2(hctx
, 0, bl
.length(),
153 &bl
, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED
);
155 CLS_ERR("%s: failed to write part header: r=%d",
156 __PRETTY_FUNCTION__
, r
);
163 int read_header(cls_method_context_t hctx
,
164 std::optional
<objv
> objv
,
165 info
* info
, bool get_info
= false)
169 int r
= cls_cxx_stat2(hctx
, &size
, nullptr);
171 CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__
, r
);
175 ceph::buffer::list bl
;
176 r
= cls_cxx_read2(hctx
, 0, size
, &bl
, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED
);
178 CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__
, r
);
184 CLS_LOG(5, "%s: Zero length object, likely probe, returning ENODATA", __PRETTY_FUNCTION__
);
186 CLS_ERR("ERROR: %s: Zero length object, returning ENODATA", __PRETTY_FUNCTION__
);
192 auto iter
= bl
.cbegin();
194 } catch (const ceph::buffer::error
& err
) {
195 CLS_ERR("ERROR: %s: failed decoding header", __PRETTY_FUNCTION__
);
199 if (objv
&& !(info
->version
== *objv
)) {
200 auto s1
= info
->version
.to_str();
201 auto s2
= objv
->to_str();
202 CLS_ERR("%s: version mismatch (header=%s, req=%s), canceled operation",
203 __PRETTY_FUNCTION__
, s1
.c_str(), s2
.c_str());
210 int create_meta(cls_method_context_t hctx
,
211 ceph::buffer::list
* in
, ceph::buffer::list
* out
)
213 CLS_LOG(5, "%s", __PRETTY_FUNCTION__
);
217 auto iter
= in
->cbegin();
219 } catch (const ceph::buffer::error
& err
) {
220 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__
);
225 CLS_ERR("%s: ID cannot be empty", __PRETTY_FUNCTION__
);
229 if (op
.max_part_size
== 0 ||
230 op
.max_entry_size
== 0 ||
231 op
.max_entry_size
> op
.max_part_size
) {
232 CLS_ERR("ERROR: %s: invalid dimensions.", __PRETTY_FUNCTION__
);
238 int r
= cls_cxx_stat2(hctx
, &size
, nullptr);
239 if (r
< 0 && r
!= -ENOENT
) {
240 CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__
, r
);
243 if (op
.exclusive
&& r
== 0) {
244 CLS_ERR("%s: exclusive create but queue already exists", __PRETTY_FUNCTION__
);
249 ceph::buffer::list bl
;
250 r
= cls_cxx_read2(hctx
, 0, size
, &bl
, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED
);
252 CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__
, r
);
258 auto iter
= bl
.cbegin();
259 decode(header
, iter
);
260 } catch (const ceph::buffer::error
& err
) {
261 CLS_ERR("ERROR: %s: failed decoding header", __PRETTY_FUNCTION__
);
265 if (!(header
.id
== op
.id
&&
267 header
.oid_prefix
== *op
.oid_prefix
) &&
269 header
.version
== *op
.version
))) {
270 CLS_ERR("%s: failed to re-create existing queue "
271 "with different params", __PRETTY_FUNCTION__
);
275 return 0; /* already exists */
281 header
.version
= *op
.version
;
283 static constexpr auto DEFAULT_INSTANCE_SIZE
= 16;
284 char buf
[DEFAULT_INSTANCE_SIZE
+ 1];
285 cls_gen_rand_base64(buf
, sizeof(buf
));
286 buf
[DEFAULT_INSTANCE_SIZE
] = '\0';
287 header
.version
.instance
= buf
;
288 header
.version
.ver
= 1;
290 header
.oid_prefix
= new_oid_prefix(op
.id
, op
.oid_prefix
);
292 header
.params
.max_part_size
= op
.max_part_size
;
293 header
.params
.max_entry_size
= op
.max_entry_size
;
294 header
.params
.full_size_threshold
= op
.max_part_size
- op
.max_entry_size
- part_entry_overhead
;
296 r
= write_header(hctx
, header
, false);
298 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__
, r
);
305 int update_meta(cls_method_context_t hctx
, ceph::buffer::list
* in
,
306 ceph::buffer::list
* out
)
308 CLS_LOG(5, "%s", __PRETTY_FUNCTION__
);
312 auto iter
= in
->cbegin();
314 } catch (const ceph::buffer::error
& err
) {
315 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__
);
319 if (op
.version
.empty()) {
320 CLS_ERR("%s: no version supplied", __PRETTY_FUNCTION__
);
326 int r
= read_header(hctx
, op
.version
, &header
);
331 auto u
= fifo::update().tail_part_num(op
.tail_part_num
)
332 .head_part_num(op
.head_part_num
)
333 .min_push_part_num(op
.min_push_part_num
)
334 .max_push_part_num(op
.max_push_part_num
)
335 .journal_entries_add(
336 std::move(op
.journal_entries_add
))
338 std::move(op
.journal_entries_rm
));
340 auto err
= header
.apply_update(u
);
342 std::ostringstream ss
;
344 CLS_ERR("%s: %s: %s", __PRETTY_FUNCTION__
, err
->c_str(),
349 r
= write_header(hctx
, header
);
351 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__
, r
);
358 int get_meta(cls_method_context_t hctx
, ceph::buffer::list
* in
,
359 ceph::buffer::list
* out
)
361 CLS_LOG(5, "%s", __PRETTY_FUNCTION__
);
365 auto iter
= in
->cbegin();
367 } catch (const ceph::buffer::error
&err
) {
368 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__
);
372 op::get_meta_reply reply
;
373 int r
= read_header(hctx
, op
.version
, &reply
.info
, true);
378 reply
.part_header_size
= CLS_FIFO_MAX_PART_HEADER_SIZE
;
379 reply
.part_entry_overhead
= part_entry_overhead
;
386 int init_part(cls_method_context_t hctx
, ceph::buffer::list
* in
,
387 ceph::buffer::list
*out
)
389 CLS_LOG(5, "%s", __PRETTY_FUNCTION__
);
393 auto iter
= in
->cbegin();
395 } catch (const ceph::buffer::error
&err
) {
396 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__
);
402 if (op
.tag
.empty()) {
403 CLS_ERR("%s: tag required", __PRETTY_FUNCTION__
);
407 int r
= cls_cxx_stat2(hctx
, &size
, nullptr);
408 if (r
< 0 && r
!= -ENOENT
) {
409 CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__
, r
);
412 if (r
== 0 && size
> 0) {
413 part_header part_header
;
414 r
= read_part_header(hctx
, &part_header
);
416 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__
);
420 if (!(part_header
.tag
== op
.tag
&&
421 part_header
.params
== op
.params
)) {
422 CLS_ERR("%s: failed to re-create existing part with different "
423 "params", __PRETTY_FUNCTION__
);
427 return 0; /* already exists */
430 part_header part_header
;
432 part_header
.tag
= op
.tag
;
433 part_header
.params
= op
.params
;
435 part_header
.min_ofs
= CLS_FIFO_MAX_PART_HEADER_SIZE
;
436 part_header
.last_ofs
= 0;
437 part_header
.next_ofs
= part_header
.min_ofs
;
438 part_header
.max_time
= ceph::real_clock::now();
440 cls_gen_random_bytes(reinterpret_cast<char *>(&part_header
.magic
),
441 sizeof(part_header
.magic
));
443 r
= write_part_header(hctx
, part_header
);
445 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__
, r
);
452 bool full_part(const part_header
& part_header
)
454 return (part_header
.next_ofs
> part_header
.params
.full_size_threshold
);
457 int push_part(cls_method_context_t hctx
, ceph::buffer::list
* in
,
458 ceph::buffer::list
* out
)
460 CLS_LOG(5, "%s", __PRETTY_FUNCTION__
);
464 auto iter
= in
->cbegin();
466 } catch (const ceph::buffer::error
& err
) {
467 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__
);
471 if (op
.tag
.empty()) {
472 CLS_ERR("%s: tag required", __PRETTY_FUNCTION__
);
476 part_header part_header
;
477 int r
= read_part_header(hctx
, &part_header
);
479 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__
);
483 if (!(part_header
.tag
== op
.tag
)) {
484 CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__
);
488 std::uint64_t effective_len
= op
.total_len
+ op
.data_bufs
.size() *
491 if (effective_len
> part_header
.params
.max_part_size
) {
495 if (full_part(part_header
)) {
499 auto now
= ceph::real_clock::now();
500 struct entry_header entry_header
= { now
};
501 ceph::buffer::list entry_header_bl
;
502 encode(entry_header
, entry_header_bl
);
504 auto max_index
= part_header
.max_index
;
505 const auto write_ofs
= part_header
.next_ofs
;
506 auto ofs
= part_header
.next_ofs
;
508 entry_header_pre pre_header
;
509 pre_header
.magic
= part_header
.magic
;
510 pre_header
.pre_size
= sizeof(pre_header
);
511 pre_header
.reserved
= 0;
513 std::uint64_t total_data
= 0;
514 for (auto& data
: op
.data_bufs
) {
515 total_data
+= data
.length();
517 if (total_data
!= op
.total_len
) {
518 CLS_ERR("%s: length mismatch: op.total_len=%" PRId64
519 " total data received=%" PRId64
,
520 __PRETTY_FUNCTION__
, op
.total_len
, total_data
);
525 int entries_pushed
= 0;
526 ceph::buffer::list all_data
;
527 for (auto& data
: op
.data_bufs
) {
528 if (full_part(part_header
))
531 pre_header
.header_size
= entry_header_bl
.length();
532 pre_header
.data_size
= data
.length();
533 pre_header
.index
= max_index
;
535 bufferptr
pre(reinterpret_cast<char*>(&pre_header
), sizeof(pre_header
));
536 auto entry_write_len
= pre
.length() + entry_header_bl
.length() + data
.length();
537 all_data
.append(pre
);
538 all_data
.append(entry_header_bl
);
539 all_data
.claim_append(data
);
541 part_header
.last_ofs
= ofs
;
542 ofs
+= entry_write_len
;
545 part_header
.max_index
= max_index
;
546 part_header
.next_ofs
= ofs
;
548 part_header
.max_time
= now
;
550 auto write_len
= all_data
.length();
552 r
= cls_cxx_write2(hctx
, write_ofs
, write_len
,
553 &all_data
, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED
);
556 CLS_ERR("%s: failed to write entries (ofs=%" PRIu64
557 " len=%u): r=%d", __PRETTY_FUNCTION__
, write_ofs
,
563 r
= write_part_header(hctx
, part_header
);
565 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__
, r
);
569 if (entries_pushed
== 0) {
570 CLS_ERR("%s: pushed no entries? Can't happen!", __PRETTY_FUNCTION__
);
574 return entries_pushed
;
578 static constexpr std::uint64_t prefetch_len
= (128 * 1024);
580 cls_method_context_t hctx
;
582 const fifo::part_header
& part_header
;
585 ceph::buffer::list data
;
587 int fetch(std::uint64_t num_bytes
);
588 int read(std::uint64_t num_bytes
, ceph::buffer::list
* pbl
);
589 int peek(std::uint64_t num_bytes
, char *dest
);
590 int seek(std::uint64_t num_bytes
);
593 EntryReader(cls_method_context_t hctx
,
594 const fifo::part_header
& part_header
,
595 uint64_t ofs
) : hctx(hctx
),
596 part_header(part_header
),
597 ofs(ofs
< part_header
.min_ofs
?
598 part_header
.min_ofs
:
601 std::uint64_t get_ofs() const {
606 return (ofs
>= part_header
.next_ofs
);
609 int peek_pre_header(entry_header_pre
* pre_header
);
610 int get_next_entry(ceph::buffer::list
* pbl
,
612 ceph::real_time
* pmtime
);
616 int EntryReader::fetch(std::uint64_t num_bytes
)
618 CLS_LOG(5, "%s: fetch %d bytes, ofs=%d data.length()=%d", __PRETTY_FUNCTION__
, (int)num_bytes
, (int)ofs
, (int)data
.length());
619 if (data
.length() < num_bytes
) {
620 ceph::buffer::list bl
;
621 CLS_LOG(5, "%s: reading % " PRId64
" bytes at ofs=%" PRId64
, __PRETTY_FUNCTION__
,
622 prefetch_len
, ofs
+ data
.length());
623 int r
= cls_cxx_read2(hctx
, ofs
+ data
.length(), prefetch_len
, &bl
, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED
);
625 CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__
, r
);
628 data
.claim_append(bl
);
631 if (static_cast<unsigned>(num_bytes
) > data
.length()) {
632 CLS_ERR("%s: requested %" PRId64
" bytes, but only "
633 "%u were available", __PRETTY_FUNCTION__
, num_bytes
, data
.length());
640 int EntryReader::read(std::uint64_t num_bytes
, ceph::buffer::list
* pbl
)
642 int r
= fetch(num_bytes
);
646 data
.splice(0, num_bytes
, pbl
);
653 int EntryReader::peek(std::uint64_t num_bytes
, char* dest
)
655 int r
= fetch(num_bytes
);
660 data
.begin().copy(num_bytes
, dest
);
665 int EntryReader::seek(std::uint64_t num_bytes
)
667 ceph::buffer::list bl
;
669 CLS_LOG(5, "%s:%d: num_bytes=%" PRIu64
, __PRETTY_FUNCTION__
, __LINE__
, num_bytes
);
670 return read(num_bytes
, &bl
);
673 int EntryReader::peek_pre_header(entry_header_pre
* pre_header
)
679 int r
= peek(sizeof(*pre_header
),
680 reinterpret_cast<char*>(pre_header
));
682 CLS_ERR("ERROR: %s: peek() size=%zu failed: r=%d", __PRETTY_FUNCTION__
,
683 sizeof(pre_header
), r
);
687 if (pre_header
->magic
!= part_header
.magic
) {
688 CLS_ERR("ERROR: %s: unexpected pre_header magic", __PRETTY_FUNCTION__
);
696 int EntryReader::get_next_entry(ceph::buffer::list
* pbl
,
698 ceph::real_time
* pmtime
)
700 entry_header_pre pre_header
;
701 int r
= peek_pre_header(&pre_header
);
703 CLS_ERR("ERROR: %s: peek_pre_header() failed: r=%d", __PRETTY_FUNCTION__
, r
);
711 CLS_LOG(5, "%s:%d: pre_header.pre_size=%" PRIu64
, __PRETTY_FUNCTION__
, __LINE__
,
712 uint64_t(pre_header
.pre_size
));
713 r
= seek(pre_header
.pre_size
);
715 CLS_ERR("ERROR: %s: failed to seek: r=%d", __PRETTY_FUNCTION__
, r
);
719 ceph::buffer::list header
;
720 CLS_LOG(5, "%s:%d: pre_header.header_size=%d", __PRETTY_FUNCTION__
, __LINE__
, (int)pre_header
.header_size
);
721 r
= read(pre_header
.header_size
, &header
);
723 CLS_ERR("ERROR: %s: failed to read entry header: r=%d", __PRETTY_FUNCTION__
, r
);
727 entry_header entry_header
;
728 auto iter
= header
.cbegin();
730 decode(entry_header
, iter
);
731 } catch (ceph::buffer::error
& err
) {
732 CLS_ERR("%s: failed decoding entry header", __PRETTY_FUNCTION__
);
737 *pmtime
= entry_header
.mtime
;
741 r
= read(pre_header
.data_size
, pbl
);
743 CLS_ERR("%s: failed reading data: r=%d", __PRETTY_FUNCTION__
, r
);
747 r
= seek(pre_header
.data_size
);
749 CLS_ERR("ERROR: %s: failed to seek: r=%d", __PRETTY_FUNCTION__
, r
);
757 int trim_part(cls_method_context_t hctx
,
758 ceph::buffer::list
*in
, ceph::buffer::list
*out
)
760 CLS_LOG(5, "%s", __PRETTY_FUNCTION__
);
764 auto iter
= in
->cbegin();
766 } catch (const ceph::buffer::error
&err
) {
767 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__
);
771 part_header part_header
;
772 int r
= read_part_header(hctx
, &part_header
);
774 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__
);
779 !(part_header
.tag
== *op
.tag
)) {
780 CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__
);
784 if (op
.ofs
< part_header
.min_ofs
) {
787 if (op
.exclusive
&& op
.ofs
== part_header
.min_ofs
) {
791 if (op
.ofs
>= part_header
.next_ofs
) {
792 if (full_part(part_header
)) {
794 * trim full part completely: remove object
797 r
= cls_cxx_remove(hctx
);
799 CLS_ERR("%s: ERROR: cls_cxx_remove() returned r=%d", __PRETTY_FUNCTION__
, r
);
806 part_header
.min_ofs
= part_header
.next_ofs
;
807 part_header
.min_index
= part_header
.max_index
;
809 EntryReader
reader(hctx
, part_header
, op
.ofs
);
811 entry_header_pre pre_header
;
812 int r
= reader
.peek_pre_header(&pre_header
);
818 part_header
.min_index
= pre_header
.index
;
820 r
= reader
.get_next_entry(nullptr, nullptr, nullptr);
822 CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d",
823 __PRETTY_FUNCTION__
, r
);
826 part_header
.min_index
= pre_header
.index
+ 1;
829 part_header
.min_ofs
= reader
.get_ofs();
832 r
= write_part_header(hctx
, part_header
);
834 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__
, r
);
841 int list_part(cls_method_context_t hctx
, ceph::buffer::list
* in
,
842 ceph::buffer::list
* out
)
844 CLS_LOG(5, "%s", __PRETTY_FUNCTION__
);
848 auto iter
= in
->cbegin();
850 } catch (const buffer::error
&err
) {
851 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__
);
855 part_header part_header
;
856 int r
= read_part_header(hctx
, &part_header
);
858 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__
);
863 !(part_header
.tag
== *op
.tag
)) {
864 CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__
);
868 EntryReader
reader(hctx
, part_header
, op
.ofs
);
870 if (op
.ofs
>= part_header
.min_ofs
&&
872 r
= reader
.get_next_entry(nullptr, nullptr, nullptr);
874 CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d", __PRETTY_FUNCTION__
, r
);
879 op::list_part_reply reply
;
881 reply
.tag
= part_header
.tag
;
883 auto max_entries
= std::min(op
.max_entries
, op::MAX_LIST_ENTRIES
);
885 for (int i
= 0; i
< max_entries
&& !reader
.end(); ++i
) {
886 ceph::buffer::list data
;
887 ceph::real_time mtime
;
890 r
= reader
.get_next_entry(&data
, &ofs
, &mtime
);
892 CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d",
893 __PRETTY_FUNCTION__
, r
);
897 reply
.entries
.emplace_back(std::move(data
), ofs
, mtime
);
900 reply
.more
= !reader
.end();
901 reply
.full_part
= full_part(part_header
);
908 int get_part_info(cls_method_context_t hctx
, ceph::buffer::list
*in
,
909 ceph::buffer::list
*out
)
911 CLS_LOG(5, "%s", __PRETTY_FUNCTION__
);
913 op::get_part_info op
;
915 auto iter
= in
->cbegin();
917 } catch (const ceph::buffer::error
&err
) {
918 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__
);
922 op::get_part_info_reply reply
;
924 int r
= read_part_header(hctx
, &reply
.header
);
926 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__
);
935 } // namespace rados::cls::fifo
939 using namespace rados::cls::fifo
;
940 CLS_LOG(10, "Loaded fifo class!");
942 cls_handle_t h_class
;
943 cls_method_handle_t h_create_meta
;
944 cls_method_handle_t h_get_meta
;
945 cls_method_handle_t h_update_meta
;
946 cls_method_handle_t h_init_part
;
947 cls_method_handle_t h_push_part
;
948 cls_method_handle_t h_trim_part
;
949 cls_method_handle_t h_list_part
;
950 cls_method_handle_t h_get_part_info
;
952 cls_register(op::CLASS
, &h_class
);
953 cls_register_cxx_method(h_class
, op::CREATE_META
,
954 CLS_METHOD_RD
| CLS_METHOD_WR
,
955 create_meta
, &h_create_meta
);
957 cls_register_cxx_method(h_class
, op::GET_META
,
959 get_meta
, &h_get_meta
);
961 cls_register_cxx_method(h_class
, op::UPDATE_META
,
962 CLS_METHOD_RD
| CLS_METHOD_WR
,
963 update_meta
, &h_update_meta
);
965 cls_register_cxx_method(h_class
, op::INIT_PART
,
966 CLS_METHOD_RD
| CLS_METHOD_WR
,
967 init_part
, &h_init_part
);
969 cls_register_cxx_method(h_class
, op::PUSH_PART
,
970 CLS_METHOD_RD
| CLS_METHOD_WR
,
971 push_part
, &h_push_part
);
973 cls_register_cxx_method(h_class
, op::TRIM_PART
,
974 CLS_METHOD_RD
| CLS_METHOD_WR
,
975 trim_part
, &h_trim_part
);
977 cls_register_cxx_method(h_class
, op::LIST_PART
,
979 list_part
, &h_list_part
);
981 cls_register_cxx_method(h_class
, op::GET_PART_INFO
,
983 get_part_info
, &h_get_part_info
);
985 /* calculate entry overhead */
986 struct entry_header entry_header
;
987 ceph::buffer::list entry_header_bl
;
988 encode(entry_header
, entry_header_bl
);
990 part_entry_overhead
= sizeof(entry_header_pre
) + entry_header_bl
.length();