]> git.proxmox.com Git - ceph.git/blob - ceph/src/cls/fifo/cls_fifo.cc
fc89a20e6b2bfd310e65757f7dc617d4229e9e7c
[ceph.git] / ceph / src / cls / fifo / cls_fifo.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /** \file
5 *
6 * This is an OSD class that implements methods for management
7 * and use of fifo
8 *
9 */
10
11 #include <cerrno>
12 #include <optional>
13 #include <string>
14
15 #undef FMT_HEADER_ONLY
16 #define FMT_HEADER_ONLY 1
17 #include <fmt/format.h>
18
19 #include "include/buffer.h"
20 #include "include/types.h"
21
22 #include "objclass/objclass.h"
23
24 #include "cls/fifo/cls_fifo_ops.h"
25 #include "cls/fifo/cls_fifo_types.h"
26
27 CLS_VER(1,0)
28 CLS_NAME(fifo)
29
30 namespace rados::cls::fifo {
31
32 static constexpr auto CLS_FIFO_MAX_PART_HEADER_SIZE = 512;
33
34 static std::uint32_t part_entry_overhead;
35
36 struct entry_header_pre {
37 ceph_le64 magic;
38 ceph_le64 pre_size;
39 ceph_le64 header_size;
40 ceph_le64 data_size;
41 ceph_le64 index;
42 ceph_le32 reserved;
43 } __attribute__ ((packed));
44
45 struct entry_header {
46 ceph::real_time mtime;
47
48 void encode(ceph::buffer::list& bl) const {
49 ENCODE_START(1, 1, bl);
50 encode(mtime, bl);
51 ENCODE_FINISH(bl);
52 }
53 void decode(ceph::buffer::list::const_iterator& bl) {
54 DECODE_START(1, bl);
55 decode(mtime, bl);
56 DECODE_FINISH(bl);
57 }
58 };
59 WRITE_CLASS_ENCODER(entry_header)
60
61 namespace {
62
63 std::string new_oid_prefix(std::string id, std::optional<std::string>& val)
64 {
65 static constexpr auto PREFIX_RND_SIZE = 12;
66 if (val) {
67 return *val;
68 }
69
70 char buf[PREFIX_RND_SIZE + 1];
71 buf[PREFIX_RND_SIZE] = 0;
72
73 cls_gen_rand_base64(buf, sizeof(buf) - 1);
74
75 return fmt::format("{}.{}", id, buf);
76 }
77
78 int write_header(cls_method_context_t hctx,
79 info& header,
80 bool inc_ver = true)
81 {
82 static constexpr auto HEADER_INSTANCE_SIZE = 16;
83 if (header.version.instance.empty()) {
84 char buf[HEADER_INSTANCE_SIZE + 1];
85 buf[HEADER_INSTANCE_SIZE] = 0;
86 cls_gen_rand_base64(buf, sizeof(buf) - 1);
87 header.version.instance = buf;
88 }
89 if (inc_ver) {
90 ++header.version.ver;
91 }
92 ceph::buffer::list bl;
93 encode(header, bl);
94 return cls_cxx_write_full(hctx, &bl);
95 }
96
97 int read_part_header(cls_method_context_t hctx,
98 part_header* part_header)
99 {
100 ceph::buffer::list bl;
101 int r = cls_cxx_read2(hctx, 0, CLS_FIFO_MAX_PART_HEADER_SIZE, &bl,
102 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
103 if (r < 0) {
104 CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__, r);
105 return r;
106 }
107
108 auto iter = bl.cbegin();
109 try {
110 decode(*part_header, iter);
111 } catch (const ceph::buffer::error& err) {
112 CLS_ERR("ERROR: %s: failed decoding part header", __PRETTY_FUNCTION__);
113 return -EIO;
114 }
115
116 using ceph::operator <<;
117 std::ostringstream ss;
118 ss << part_header->max_time;
119 CLS_LOG(5, "%s:%d read part_header:\n"
120 "\ttag=%s\n"
121 "\tmagic=0x%" PRIx64 "\n"
122 "\tmin_ofs=%" PRId64 "\n"
123 "\tlast_ofs=%" PRId64 "\n"
124 "\tnext_ofs=%" PRId64 "\n"
125 "\tmin_index=%" PRId64 "\n"
126 "\tmax_index=%" PRId64 "\n"
127 "\tmax_time=%s\n",
128 __PRETTY_FUNCTION__, __LINE__,
129 part_header->tag.c_str(),
130 part_header->magic,
131 part_header->min_ofs,
132 part_header->last_ofs,
133 part_header->next_ofs,
134 part_header->min_index,
135 part_header->max_index,
136 ss.str().c_str());
137
138 return 0;
139 }
140
141 int write_part_header(cls_method_context_t hctx,
142 part_header& part_header)
143 {
144 ceph::buffer::list bl;
145 encode(part_header, bl);
146
147 if (bl.length() > CLS_FIFO_MAX_PART_HEADER_SIZE) {
148 CLS_ERR("%s: cannot write part header, buffer exceeds max size", __PRETTY_FUNCTION__);
149 return -EIO;
150 }
151
152 int r = cls_cxx_write2(hctx, 0, bl.length(),
153 &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
154 if (r < 0) {
155 CLS_ERR("%s: failed to write part header: r=%d",
156 __PRETTY_FUNCTION__, r);
157 return r;
158 }
159
160 return 0;
161 }
162
163 int read_header(cls_method_context_t hctx,
164 std::optional<objv> objv,
165 info* info, bool get_info = false)
166 {
167 std::uint64_t size;
168
169 int r = cls_cxx_stat2(hctx, &size, nullptr);
170 if (r < 0) {
171 CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__, r);
172 return r;
173 }
174
175 ceph::buffer::list bl;
176 r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
177 if (r < 0) {
178 CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__, r);
179 return r;
180 }
181
182 if (r == 0) {
183 if (get_info) {
184 CLS_LOG(5, "%s: Zero length object, likely probe, returning ENODATA", __PRETTY_FUNCTION__);
185 } else {
186 CLS_ERR("ERROR: %s: Zero length object, returning ENODATA", __PRETTY_FUNCTION__);
187 }
188 return -ENODATA;
189 }
190
191 try {
192 auto iter = bl.cbegin();
193 decode(*info, iter);
194 } catch (const ceph::buffer::error& err) {
195 CLS_ERR("ERROR: %s: failed decoding header", __PRETTY_FUNCTION__);
196 return -EIO;
197 }
198
199 if (objv && !(info->version== *objv)) {
200 auto s1 = info->version.to_str();
201 auto s2 = objv->to_str();
202 CLS_ERR("%s: version mismatch (header=%s, req=%s), canceled operation",
203 __PRETTY_FUNCTION__, s1.c_str(), s2.c_str());
204 return -ECANCELED;
205 }
206
207 return 0;
208 }
209
210 int create_meta(cls_method_context_t hctx,
211 ceph::buffer::list* in, ceph::buffer::list* out)
212 {
213 CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
214
215 op::create_meta op;
216 try {
217 auto iter = in->cbegin();
218 decode(op, iter);
219 } catch (const ceph::buffer::error& err) {
220 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
221 return -EINVAL;
222 }
223
224 if (op.id.empty()) {
225 CLS_ERR("%s: ID cannot be empty", __PRETTY_FUNCTION__);
226 return -EINVAL;
227 }
228
229 if (op.max_part_size == 0 ||
230 op.max_entry_size == 0 ||
231 op.max_entry_size > op.max_part_size) {
232 CLS_ERR("ERROR: %s: invalid dimensions.", __PRETTY_FUNCTION__);
233 return -EINVAL;
234 }
235
236 std::uint64_t size;
237
238 int r = cls_cxx_stat2(hctx, &size, nullptr);
239 if (r < 0 && r != -ENOENT) {
240 CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__, r);
241 return r;
242 }
243 if (op.exclusive && r == 0) {
244 CLS_ERR("%s: exclusive create but queue already exists", __PRETTY_FUNCTION__);
245 return -EEXIST;
246 }
247
248 if (r == 0) {
249 ceph::buffer::list bl;
250 r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
251 if (r < 0) {
252 CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__, r);
253 return r;
254 }
255
256 info header;
257 try {
258 auto iter = bl.cbegin();
259 decode(header, iter);
260 } catch (const ceph::buffer::error& err) {
261 CLS_ERR("ERROR: %s: failed decoding header", __PRETTY_FUNCTION__);
262 return -EIO;
263 }
264
265 if (!(header.id == op.id &&
266 (!op.oid_prefix ||
267 header.oid_prefix == *op.oid_prefix) &&
268 (!op.version ||
269 header.version == *op.version))) {
270 CLS_ERR("%s: failed to re-create existing queue "
271 "with different params", __PRETTY_FUNCTION__);
272 return -EEXIST;
273 }
274
275 return 0; /* already exists */
276 }
277 info header;
278
279 header.id = op.id;
280 if (op.version) {
281 header.version = *op.version;
282 } else {
283 static constexpr auto DEFAULT_INSTANCE_SIZE = 16;
284 char buf[DEFAULT_INSTANCE_SIZE + 1];
285 cls_gen_rand_base64(buf, sizeof(buf));
286 buf[DEFAULT_INSTANCE_SIZE] = '\0';
287 header.version.instance = buf;
288 header.version.ver = 1;
289 }
290 header.oid_prefix = new_oid_prefix(op.id, op.oid_prefix);
291
292 header.params.max_part_size = op.max_part_size;
293 header.params.max_entry_size = op.max_entry_size;
294 header.params.full_size_threshold = op.max_part_size - op.max_entry_size - part_entry_overhead;
295
296 r = write_header(hctx, header, false);
297 if (r < 0) {
298 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
299 return r;
300 }
301
302 return 0;
303 }
304
305 int update_meta(cls_method_context_t hctx, ceph::buffer::list* in,
306 ceph::buffer::list* out)
307 {
308 CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
309
310 op::update_meta op;
311 try {
312 auto iter = in->cbegin();
313 decode(op, iter);
314 } catch (const ceph::buffer::error& err) {
315 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
316 return -EINVAL;
317 }
318
319 if (op.version.empty()) {
320 CLS_ERR("%s: no version supplied", __PRETTY_FUNCTION__);
321 return -EINVAL;
322 }
323
324 info header;
325
326 int r = read_header(hctx, op.version, &header);
327 if (r < 0) {
328 return r;
329 }
330
331 auto u = fifo::update().tail_part_num(op.tail_part_num)
332 .head_part_num(op.head_part_num)
333 .min_push_part_num(op.min_push_part_num)
334 .max_push_part_num(op.max_push_part_num)
335 .journal_entries_add(
336 std::move(op.journal_entries_add))
337 .journal_entries_rm(
338 std::move(op.journal_entries_rm));
339
340 auto err = header.apply_update(u);
341 if (err) {
342 std::ostringstream ss;
343 ss << u;
344 CLS_ERR("%s: %s: %s", __PRETTY_FUNCTION__, err->c_str(),
345 ss.str().c_str());
346 return -EINVAL;
347 }
348
349 r = write_header(hctx, header);
350 if (r < 0) {
351 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
352 return r;
353 }
354
355 return 0;
356 }
357
358 int get_meta(cls_method_context_t hctx, ceph::buffer::list* in,
359 ceph::buffer::list* out)
360 {
361 CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
362
363 op::get_meta op;
364 try {
365 auto iter = in->cbegin();
366 decode(op, iter);
367 } catch (const ceph::buffer::error &err) {
368 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
369 return -EINVAL;
370 }
371
372 op::get_meta_reply reply;
373 int r = read_header(hctx, op.version, &reply.info, true);
374 if (r < 0) {
375 return r;
376 }
377
378 reply.part_header_size = CLS_FIFO_MAX_PART_HEADER_SIZE;
379 reply.part_entry_overhead = part_entry_overhead;
380
381 encode(reply, *out);
382
383 return 0;
384 }
385
386 int init_part(cls_method_context_t hctx, ceph::buffer::list* in,
387 ceph::buffer::list *out)
388 {
389 CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
390
391 op::init_part op;
392 try {
393 auto iter = in->cbegin();
394 decode(op, iter);
395 } catch (const ceph::buffer::error &err) {
396 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
397 return -EINVAL;
398 }
399
400 std::uint64_t size;
401
402 if (op.tag.empty()) {
403 CLS_ERR("%s: tag required", __PRETTY_FUNCTION__);
404 return -EINVAL;
405 }
406
407 int r = cls_cxx_stat2(hctx, &size, nullptr);
408 if (r < 0 && r != -ENOENT) {
409 CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__, r);
410 return r;
411 }
412 if (r == 0 && size > 0) {
413 part_header part_header;
414 r = read_part_header(hctx, &part_header);
415 if (r < 0) {
416 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
417 return r;
418 }
419
420 if (!(part_header.tag == op.tag &&
421 part_header.params == op.params)) {
422 CLS_ERR("%s: failed to re-create existing part with different "
423 "params", __PRETTY_FUNCTION__);
424 return -EEXIST;
425 }
426
427 return 0; /* already exists */
428 }
429
430 part_header part_header;
431
432 part_header.tag = op.tag;
433 part_header.params = op.params;
434
435 part_header.min_ofs = CLS_FIFO_MAX_PART_HEADER_SIZE;
436 part_header.last_ofs = 0;
437 part_header.next_ofs = part_header.min_ofs;
438 part_header.max_time = ceph::real_clock::now();
439
440 cls_gen_random_bytes(reinterpret_cast<char *>(&part_header.magic),
441 sizeof(part_header.magic));
442
443 r = write_part_header(hctx, part_header);
444 if (r < 0) {
445 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
446 return r;
447 }
448
449 return 0;
450 }
451
452 bool full_part(const part_header& part_header)
453 {
454 return (part_header.next_ofs > part_header.params.full_size_threshold);
455 }
456
457 int push_part(cls_method_context_t hctx, ceph::buffer::list* in,
458 ceph::buffer::list* out)
459 {
460 CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
461
462 op::push_part op;
463 try {
464 auto iter = in->cbegin();
465 decode(op, iter);
466 } catch (const ceph::buffer::error& err) {
467 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
468 return -EINVAL;
469 }
470
471 if (op.tag.empty()) {
472 CLS_ERR("%s: tag required", __PRETTY_FUNCTION__);
473 return -EINVAL;
474 }
475
476 part_header part_header;
477 int r = read_part_header(hctx, &part_header);
478 if (r < 0) {
479 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
480 return r;
481 }
482
483 if (!(part_header.tag == op.tag)) {
484 CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
485 return -EINVAL;
486 }
487
488 std::uint64_t effective_len = op.total_len + op.data_bufs.size() *
489 part_entry_overhead;
490
491 if (effective_len > part_header.params.max_part_size) {
492 return -EINVAL;
493 }
494
495 if (full_part(part_header)) {
496 return -ERANGE;
497 }
498
499 auto now = ceph::real_clock::now();
500 struct entry_header entry_header = { now };
501 ceph::buffer::list entry_header_bl;
502 encode(entry_header, entry_header_bl);
503
504 auto max_index = part_header.max_index;
505 const auto write_ofs = part_header.next_ofs;
506 auto ofs = part_header.next_ofs;
507
508 entry_header_pre pre_header;
509 pre_header.magic = part_header.magic;
510 pre_header.pre_size = sizeof(pre_header);
511 pre_header.reserved = 0;
512
513 std::uint64_t total_data = 0;
514 for (auto& data : op.data_bufs) {
515 total_data += data.length();
516 }
517 if (total_data != op.total_len) {
518 CLS_ERR("%s: length mismatch: op.total_len=%" PRId64
519 " total data received=%" PRId64,
520 __PRETTY_FUNCTION__, op.total_len, total_data);
521 return -EINVAL;
522 }
523
524
525 int entries_pushed = 0;
526 ceph::buffer::list all_data;
527 for (auto& data : op.data_bufs) {
528 if (full_part(part_header))
529 break;
530
531 pre_header.header_size = entry_header_bl.length();
532 pre_header.data_size = data.length();
533 pre_header.index = max_index;
534
535 bufferptr pre(reinterpret_cast<char*>(&pre_header), sizeof(pre_header));
536 auto entry_write_len = pre.length() + entry_header_bl.length() + data.length();
537 all_data.append(pre);
538 all_data.append(entry_header_bl);
539 all_data.claim_append(data);
540
541 part_header.last_ofs = ofs;
542 ofs += entry_write_len;
543 ++max_index;
544 ++entries_pushed;
545 part_header.max_index = max_index;
546 part_header.next_ofs = ofs;
547 }
548 part_header.max_time = now;
549
550 auto write_len = all_data.length();
551
552 r = cls_cxx_write2(hctx, write_ofs, write_len,
553 &all_data, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
554
555 if (r < 0) {
556 CLS_ERR("%s: failed to write entries (ofs=%" PRIu64
557 " len=%u): r=%d", __PRETTY_FUNCTION__, write_ofs,
558 write_len, r);
559 return r;
560 }
561
562
563 r = write_part_header(hctx, part_header);
564 if (r < 0) {
565 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
566 return r;
567 }
568
569 if (entries_pushed == 0) {
570 CLS_ERR("%s: pushed no entries? Can't happen!", __PRETTY_FUNCTION__);
571 return -EFAULT;
572 }
573
574 return entries_pushed;
575 }
576
577 class EntryReader {
578 static constexpr std::uint64_t prefetch_len = (128 * 1024);
579
580 cls_method_context_t hctx;
581
582 const fifo::part_header& part_header;
583
584 std::uint64_t ofs;
585 ceph::buffer::list data;
586
587 int fetch(std::uint64_t num_bytes);
588 int read(std::uint64_t num_bytes, ceph::buffer::list* pbl);
589 int peek(std::uint64_t num_bytes, char *dest);
590 int seek(std::uint64_t num_bytes);
591
592 public:
593 EntryReader(cls_method_context_t hctx,
594 const fifo::part_header& part_header,
595 uint64_t ofs) : hctx(hctx),
596 part_header(part_header),
597 ofs(ofs < part_header.min_ofs ?
598 part_header.min_ofs :
599 ofs) {}
600
601 std::uint64_t get_ofs() const {
602 return ofs;
603 }
604
605 bool end() const {
606 return (ofs >= part_header.next_ofs);
607 }
608
609 int peek_pre_header(entry_header_pre* pre_header);
610 int get_next_entry(ceph::buffer::list* pbl,
611 std::uint64_t* pofs,
612 ceph::real_time* pmtime);
613 };
614
615
616 int EntryReader::fetch(std::uint64_t num_bytes)
617 {
618 CLS_LOG(5, "%s: fetch %d bytes, ofs=%d data.length()=%d", __PRETTY_FUNCTION__, (int)num_bytes, (int)ofs, (int)data.length());
619 if (data.length() < num_bytes) {
620 ceph::buffer::list bl;
621 CLS_LOG(5, "%s: reading % " PRId64 " bytes at ofs=%" PRId64, __PRETTY_FUNCTION__,
622 prefetch_len, ofs + data.length());
623 int r = cls_cxx_read2(hctx, ofs + data.length(), prefetch_len, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
624 if (r < 0) {
625 CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__, r);
626 return r;
627 }
628 data.claim_append(bl);
629 }
630
631 if (static_cast<unsigned>(num_bytes) > data.length()) {
632 CLS_ERR("%s: requested %" PRId64 " bytes, but only "
633 "%u were available", __PRETTY_FUNCTION__, num_bytes, data.length());
634 return -ERANGE;
635 }
636
637 return 0;
638 }
639
640 int EntryReader::read(std::uint64_t num_bytes, ceph::buffer::list* pbl)
641 {
642 int r = fetch(num_bytes);
643 if (r < 0) {
644 return r;
645 }
646 data.splice(0, num_bytes, pbl);
647
648 ofs += num_bytes;
649
650 return 0;
651 }
652
653 int EntryReader::peek(std::uint64_t num_bytes, char* dest)
654 {
655 int r = fetch(num_bytes);
656 if (r < 0) {
657 return r;
658 }
659
660 data.begin().copy(num_bytes, dest);
661
662 return 0;
663 }
664
665 int EntryReader::seek(std::uint64_t num_bytes)
666 {
667 ceph::buffer::list bl;
668
669 CLS_LOG(5, "%s:%d: num_bytes=%" PRIu64, __PRETTY_FUNCTION__, __LINE__, num_bytes);
670 return read(num_bytes, &bl);
671 }
672
673 int EntryReader::peek_pre_header(entry_header_pre* pre_header)
674 {
675 if (end()) {
676 return -ENOENT;
677 }
678
679 int r = peek(sizeof(*pre_header),
680 reinterpret_cast<char*>(pre_header));
681 if (r < 0) {
682 CLS_ERR("ERROR: %s: peek() size=%zu failed: r=%d", __PRETTY_FUNCTION__,
683 sizeof(pre_header), r);
684 return r;
685 }
686
687 if (pre_header->magic != part_header.magic) {
688 CLS_ERR("ERROR: %s: unexpected pre_header magic", __PRETTY_FUNCTION__);
689 return -ERANGE;
690 }
691
692 return 0;
693 }
694
695
696 int EntryReader::get_next_entry(ceph::buffer::list* pbl,
697 std::uint64_t* pofs,
698 ceph::real_time* pmtime)
699 {
700 entry_header_pre pre_header;
701 int r = peek_pre_header(&pre_header);
702 if (r < 0) {
703 CLS_ERR("ERROR: %s: peek_pre_header() failed: r=%d", __PRETTY_FUNCTION__, r);
704 return r;
705 }
706
707 if (pofs) {
708 *pofs = ofs;
709 }
710
711 CLS_LOG(5, "%s:%d: pre_header.pre_size=%" PRIu64, __PRETTY_FUNCTION__, __LINE__,
712 uint64_t(pre_header.pre_size));
713 r = seek(pre_header.pre_size);
714 if (r < 0) {
715 CLS_ERR("ERROR: %s: failed to seek: r=%d", __PRETTY_FUNCTION__, r);
716 return r;
717 }
718
719 ceph::buffer::list header;
720 CLS_LOG(5, "%s:%d: pre_header.header_size=%d", __PRETTY_FUNCTION__, __LINE__, (int)pre_header.header_size);
721 r = read(pre_header.header_size, &header);
722 if (r < 0) {
723 CLS_ERR("ERROR: %s: failed to read entry header: r=%d", __PRETTY_FUNCTION__, r);
724 return r;
725 }
726
727 entry_header entry_header;
728 auto iter = header.cbegin();
729 try {
730 decode(entry_header, iter);
731 } catch (ceph::buffer::error& err) {
732 CLS_ERR("%s: failed decoding entry header", __PRETTY_FUNCTION__);
733 return -EIO;
734 }
735
736 if (pmtime) {
737 *pmtime = entry_header.mtime;
738 }
739
740 if (pbl) {
741 r = read(pre_header.data_size, pbl);
742 if (r < 0) {
743 CLS_ERR("%s: failed reading data: r=%d", __PRETTY_FUNCTION__, r);
744 return r;
745 }
746 } else {
747 r = seek(pre_header.data_size);
748 if (r < 0) {
749 CLS_ERR("ERROR: %s: failed to seek: r=%d", __PRETTY_FUNCTION__, r);
750 return r;
751 }
752 }
753
754 return 0;
755 }
756
757 int trim_part(cls_method_context_t hctx,
758 ceph::buffer::list *in, ceph::buffer::list *out)
759 {
760 CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
761
762 op::trim_part op;
763 try {
764 auto iter = in->cbegin();
765 decode(op, iter);
766 } catch (const ceph::buffer::error &err) {
767 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
768 return -EINVAL;
769 }
770
771 part_header part_header;
772 int r = read_part_header(hctx, &part_header);
773 if (r < 0) {
774 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
775 return r;
776 }
777
778 if (op.tag &&
779 !(part_header.tag == *op.tag)) {
780 CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
781 return -EINVAL;
782 }
783
784 if (op.ofs < part_header.min_ofs) {
785 return 0;
786 }
787 if (op.exclusive && op.ofs == part_header.min_ofs) {
788 return 0;
789 }
790
791 if (op.ofs >= part_header.next_ofs) {
792 if (full_part(part_header)) {
793 /*
794 * trim full part completely: remove object
795 */
796
797 r = cls_cxx_remove(hctx);
798 if (r < 0) {
799 CLS_ERR("%s: ERROR: cls_cxx_remove() returned r=%d", __PRETTY_FUNCTION__, r);
800 return r;
801 }
802
803 return 0;
804 }
805
806 part_header.min_ofs = part_header.next_ofs;
807 part_header.min_index = part_header.max_index;
808 } else {
809 EntryReader reader(hctx, part_header, op.ofs);
810
811 entry_header_pre pre_header;
812 int r = reader.peek_pre_header(&pre_header);
813 if (r < 0) {
814 return r;
815 }
816
817 if (op.exclusive) {
818 part_header.min_index = pre_header.index;
819 } else {
820 r = reader.get_next_entry(nullptr, nullptr, nullptr);
821 if (r < 0) {
822 CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d",
823 __PRETTY_FUNCTION__, r);
824 return r;
825 }
826 part_header.min_index = pre_header.index + 1;
827 }
828
829 part_header.min_ofs = reader.get_ofs();
830 }
831
832 r = write_part_header(hctx, part_header);
833 if (r < 0) {
834 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
835 return r;
836 }
837
838 return 0;
839 }
840
841 int list_part(cls_method_context_t hctx, ceph::buffer::list* in,
842 ceph::buffer::list* out)
843 {
844 CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
845
846 op::list_part op;
847 try {
848 auto iter = in->cbegin();
849 decode(op, iter);
850 } catch (const buffer::error &err) {
851 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
852 return -EINVAL;
853 }
854
855 part_header part_header;
856 int r = read_part_header(hctx, &part_header);
857 if (r < 0) {
858 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
859 return r;
860 }
861
862 if (op.tag &&
863 !(part_header.tag == *op.tag)) {
864 CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
865 return -EINVAL;
866 }
867
868 EntryReader reader(hctx, part_header, op.ofs);
869
870 if (op.ofs >= part_header.min_ofs &&
871 !reader.end()) {
872 r = reader.get_next_entry(nullptr, nullptr, nullptr);
873 if (r < 0) {
874 CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d", __PRETTY_FUNCTION__, r);
875 return r;
876 }
877 }
878
879 op::list_part_reply reply;
880
881 reply.tag = part_header.tag;
882
883 auto max_entries = std::min(op.max_entries, op::MAX_LIST_ENTRIES);
884
885 for (int i = 0; i < max_entries && !reader.end(); ++i) {
886 ceph::buffer::list data;
887 ceph::real_time mtime;
888 std::uint64_t ofs;
889
890 r = reader.get_next_entry(&data, &ofs, &mtime);
891 if (r < 0) {
892 CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d",
893 __PRETTY_FUNCTION__, r);
894 return r;
895 }
896
897 reply.entries.emplace_back(std::move(data), ofs, mtime);
898 }
899
900 reply.more = !reader.end();
901 reply.full_part = full_part(part_header);
902
903 encode(reply, *out);
904
905 return 0;
906 }
907
908 int get_part_info(cls_method_context_t hctx, ceph::buffer::list *in,
909 ceph::buffer::list *out)
910 {
911 CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
912
913 op::get_part_info op;
914 try {
915 auto iter = in->cbegin();
916 decode(op, iter);
917 } catch (const ceph::buffer::error &err) {
918 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
919 return -EINVAL;
920 }
921
922 op::get_part_info_reply reply;
923
924 int r = read_part_header(hctx, &reply.header);
925 if (r < 0) {
926 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
927 return r;
928 }
929
930 encode(reply, *out);
931
932 return 0;
933 }
934 }
935 } // namespace rados::cls::fifo
936
937 CLS_INIT(fifo)
938 {
939 using namespace rados::cls::fifo;
940 CLS_LOG(10, "Loaded fifo class!");
941
942 cls_handle_t h_class;
943 cls_method_handle_t h_create_meta;
944 cls_method_handle_t h_get_meta;
945 cls_method_handle_t h_update_meta;
946 cls_method_handle_t h_init_part;
947 cls_method_handle_t h_push_part;
948 cls_method_handle_t h_trim_part;
949 cls_method_handle_t h_list_part;
950 cls_method_handle_t h_get_part_info;
951
952 cls_register(op::CLASS, &h_class);
953 cls_register_cxx_method(h_class, op::CREATE_META,
954 CLS_METHOD_RD | CLS_METHOD_WR,
955 create_meta, &h_create_meta);
956
957 cls_register_cxx_method(h_class, op::GET_META,
958 CLS_METHOD_RD,
959 get_meta, &h_get_meta);
960
961 cls_register_cxx_method(h_class, op::UPDATE_META,
962 CLS_METHOD_RD | CLS_METHOD_WR,
963 update_meta, &h_update_meta);
964
965 cls_register_cxx_method(h_class, op::INIT_PART,
966 CLS_METHOD_RD | CLS_METHOD_WR,
967 init_part, &h_init_part);
968
969 cls_register_cxx_method(h_class, op::PUSH_PART,
970 CLS_METHOD_RD | CLS_METHOD_WR,
971 push_part, &h_push_part);
972
973 cls_register_cxx_method(h_class, op::TRIM_PART,
974 CLS_METHOD_RD | CLS_METHOD_WR,
975 trim_part, &h_trim_part);
976
977 cls_register_cxx_method(h_class, op::LIST_PART,
978 CLS_METHOD_RD,
979 list_part, &h_list_part);
980
981 cls_register_cxx_method(h_class, op::GET_PART_INFO,
982 CLS_METHOD_RD,
983 get_part_info, &h_get_part_info);
984
985 /* calculate entry overhead */
986 struct entry_header entry_header;
987 ceph::buffer::list entry_header_bl;
988 encode(entry_header, entry_header_bl);
989
990 part_entry_overhead = sizeof(entry_header_pre) + entry_header_bl.length();
991
992 return;
993 }