]> git.proxmox.com Git - ceph.git/blob - ceph/src/cls/fifo/cls_fifo.cc
14313a7351655ed02094447881926ec109ec9837
[ceph.git] / ceph / src / cls / fifo / cls_fifo.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /** \file
5 *
6 * This is an OSD class that implements methods for management
7 * and use of fifo
8 *
9 */
10
11 #include <cerrno>
12 #include <optional>
13 #include <string>
14
15 #undef FMT_HEADER_ONLY
16 #define FMT_HEADER_ONLY 1
17 #include <fmt/format.h>
18
19 #include "include/buffer.h"
20 #include "include/types.h"
21
22 #include "objclass/objclass.h"
23
24 #include "cls/fifo/cls_fifo_ops.h"
25 #include "cls/fifo/cls_fifo_types.h"
26
27 CLS_VER(1,0)
28 CLS_NAME(fifo)
29
30 namespace rados::cls::fifo {
31
32 static constexpr auto CLS_FIFO_MAX_PART_HEADER_SIZE = 512;
33
34 static std::uint32_t part_entry_overhead;
35
36 struct entry_header_pre {
37 ceph_le64 magic;
38 ceph_le64 pre_size;
39 ceph_le64 header_size;
40 ceph_le64 data_size;
41 ceph_le64 index;
42 ceph_le32 reserved;
43 } __attribute__ ((packed));
44
45 struct entry_header {
46 ceph::real_time mtime;
47
48 void encode(ceph::buffer::list& bl) const {
49 ENCODE_START(1, 1, bl);
50 encode(mtime, bl);
51 ENCODE_FINISH(bl);
52 }
53 void decode(ceph::buffer::list::const_iterator& bl) {
54 DECODE_START(1, bl);
55 decode(mtime, bl);
56 DECODE_FINISH(bl);
57 }
58 };
59 WRITE_CLASS_ENCODER(entry_header)
60
61 namespace {
62
63 std::string new_oid_prefix(std::string id, std::optional<std::string>& val)
64 {
65 static constexpr auto PREFIX_RND_SIZE = 12;
66 if (val) {
67 return *val;
68 }
69
70 char buf[PREFIX_RND_SIZE + 1];
71 buf[PREFIX_RND_SIZE] = 0;
72
73 cls_gen_rand_base64(buf, sizeof(buf) - 1);
74
75 return fmt::format("{}.{}", id, buf);
76 }
77
78 int write_header(cls_method_context_t hctx,
79 info& header,
80 bool inc_ver = true)
81 {
82 static constexpr auto HEADER_INSTANCE_SIZE = 16;
83 if (header.version.instance.empty()) {
84 char buf[HEADER_INSTANCE_SIZE + 1];
85 buf[HEADER_INSTANCE_SIZE] = 0;
86 cls_gen_rand_base64(buf, sizeof(buf) - 1);
87 header.version.instance = buf;
88 }
89 if (inc_ver) {
90 ++header.version.ver;
91 }
92 ceph::buffer::list bl;
93 encode(header, bl);
94 return cls_cxx_write_full(hctx, &bl);
95 }
96
97 int read_part_header(cls_method_context_t hctx,
98 part_header* part_header)
99 {
100 ceph::buffer::list bl;
101 int r = cls_cxx_read2(hctx, 0, CLS_FIFO_MAX_PART_HEADER_SIZE, &bl,
102 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
103 if (r < 0) {
104 CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__, r);
105 return r;
106 }
107
108 auto iter = bl.cbegin();
109 try {
110 decode(*part_header, iter);
111 } catch (const ceph::buffer::error& err) {
112 CLS_ERR("ERROR: %s: failed decoding part header", __PRETTY_FUNCTION__);
113 return -EIO;
114 }
115
116 using ceph::operator <<;
117 std::ostringstream ss;
118 ss << part_header->max_time;
119 CLS_LOG(5, "%s:%d read part_header:\n"
120 "\ttag=%s\n"
121 "\tmagic=0x%" PRIx64 "\n"
122 "\tmin_ofs=%" PRId64 "\n"
123 "\tlast_ofs=%" PRId64 "\n"
124 "\tnext_ofs=%" PRId64 "\n"
125 "\tmin_index=%" PRId64 "\n"
126 "\tmax_index=%" PRId64 "\n"
127 "\tmax_time=%s\n",
128 __PRETTY_FUNCTION__, __LINE__,
129 part_header->tag.c_str(),
130 part_header->magic,
131 part_header->min_ofs,
132 part_header->last_ofs,
133 part_header->next_ofs,
134 part_header->min_index,
135 part_header->max_index,
136 ss.str().c_str());
137
138 return 0;
139 }
140
141 int write_part_header(cls_method_context_t hctx,
142 part_header& part_header)
143 {
144 ceph::buffer::list bl;
145 encode(part_header, bl);
146
147 if (bl.length() > CLS_FIFO_MAX_PART_HEADER_SIZE) {
148 CLS_ERR("%s: cannot write part header, buffer exceeds max size", __PRETTY_FUNCTION__);
149 return -EIO;
150 }
151
152 int r = cls_cxx_write2(hctx, 0, bl.length(),
153 &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
154 if (r < 0) {
155 CLS_ERR("%s: failed to write part header: r=%d",
156 __PRETTY_FUNCTION__, r);
157 return r;
158 }
159
160 return 0;
161 }
162
163 int read_header(cls_method_context_t hctx,
164 std::optional<objv> objv,
165 info* info, bool get_info = false)
166 {
167 std::uint64_t size;
168
169 int r = cls_cxx_stat2(hctx, &size, nullptr);
170 if (r < 0) {
171 CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__, r);
172 return r;
173 }
174
175 ceph::buffer::list bl;
176 r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
177 if (r < 0) {
178 CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__, r);
179 return r;
180 }
181
182 if (r == 0) {
183 if (get_info) {
184 CLS_LOG(5, "%s: Zero length object, likely probe, returning ENODATA", __PRETTY_FUNCTION__);
185 } else {
186 CLS_ERR("ERROR: %s: Zero length object, returning ENODATA", __PRETTY_FUNCTION__);
187 }
188 return -ENODATA;
189 }
190
191 try {
192 auto iter = bl.cbegin();
193 decode(*info, iter);
194 } catch (const ceph::buffer::error& err) {
195 CLS_ERR("ERROR: %s: failed decoding header", __PRETTY_FUNCTION__);
196 return -EIO;
197 }
198
199 if (objv && !(info->version== *objv)) {
200 auto s1 = info->version.to_str();
201 auto s2 = objv->to_str();
202 CLS_ERR("%s: version mismatch (header=%s, req=%s), canceled operation",
203 __PRETTY_FUNCTION__, s1.c_str(), s2.c_str());
204 return -ECANCELED;
205 }
206
207 return 0;
208 }
209
210 int create_meta(cls_method_context_t hctx,
211 ceph::buffer::list* in, ceph::buffer::list* out)
212 {
213 CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
214
215 op::create_meta op;
216 try {
217 auto iter = in->cbegin();
218 decode(op, iter);
219 } catch (const ceph::buffer::error& err) {
220 CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__,
221 err.what());
222 return -EINVAL;
223 }
224
225 if (op.id.empty()) {
226 CLS_ERR("%s: ID cannot be empty", __PRETTY_FUNCTION__);
227 return -EINVAL;
228 }
229
230 if (op.max_part_size == 0 ||
231 op.max_entry_size == 0 ||
232 op.max_entry_size > op.max_part_size) {
233 CLS_ERR("ERROR: %s: invalid dimensions.", __PRETTY_FUNCTION__);
234 return -EINVAL;
235 }
236
237 std::uint64_t size;
238
239 int r = cls_cxx_stat2(hctx, &size, nullptr);
240 if (r < 0 && r != -ENOENT) {
241 CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d",
242 __PRETTY_FUNCTION__, r);
243 return r;
244 }
245 if (op.exclusive && r == 0) {
246 CLS_ERR("%s: exclusive create but queue already exists",
247 __PRETTY_FUNCTION__);
248 return -EEXIST;
249 }
250
251 if (r == 0) {
252 CLS_LOG(5, "%s: FIFO already exists, reading from disk and comparing.",
253 __PRETTY_FUNCTION__);
254 ceph::buffer::list bl;
255 r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
256 if (r < 0) {
257 CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d",
258 __PRETTY_FUNCTION__, r);
259 return r;
260 }
261
262 info header;
263 try {
264 auto iter = bl.cbegin();
265 decode(header, iter);
266 } catch (const ceph::buffer::error& err) {
267 CLS_ERR("ERROR: %s: failed decoding header: %s",
268 __PRETTY_FUNCTION__, err.what());
269 return -EIO;
270 }
271
272 if (!(header.id == op.id &&
273 (!op.oid_prefix ||
274 header.oid_prefix == *op.oid_prefix) &&
275 (!op.version ||
276 header.version == *op.version))) {
277 CLS_ERR("%s: failed to re-create existing queue "
278 "with different params", __PRETTY_FUNCTION__);
279 return -EEXIST;
280 }
281
282 return 0; /* already exists */
283 }
284 info header;
285
286 header.id = op.id;
287 if (op.version) {
288 header.version = *op.version;
289 } else {
290 static constexpr auto DEFAULT_INSTANCE_SIZE = 16;
291 char buf[DEFAULT_INSTANCE_SIZE + 1];
292 cls_gen_rand_base64(buf, sizeof(buf));
293 buf[DEFAULT_INSTANCE_SIZE] = '\0';
294 header.version.instance = buf;
295 header.version.ver = 1;
296 }
297 header.oid_prefix = new_oid_prefix(op.id, op.oid_prefix);
298
299 header.params.max_part_size = op.max_part_size;
300 header.params.max_entry_size = op.max_entry_size;
301 header.params.full_size_threshold = op.max_part_size - op.max_entry_size - part_entry_overhead;
302
303 r = write_header(hctx, header, false);
304 if (r < 0) {
305 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
306 return r;
307 }
308
309 return 0;
310 }
311
312 int update_meta(cls_method_context_t hctx, ceph::buffer::list* in,
313 ceph::buffer::list* out)
314 {
315 CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
316
317 op::update_meta op;
318 try {
319 auto iter = in->cbegin();
320 decode(op, iter);
321 } catch (const ceph::buffer::error& err) {
322 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
323 return -EINVAL;
324 }
325
326 if (op.version.empty()) {
327 CLS_ERR("%s: no version supplied", __PRETTY_FUNCTION__);
328 return -EINVAL;
329 }
330
331 info header;
332
333 int r = read_header(hctx, op.version, &header);
334 if (r < 0) {
335 return r;
336 }
337
338 auto u = fifo::update().tail_part_num(op.tail_part_num)
339 .head_part_num(op.head_part_num)
340 .min_push_part_num(op.min_push_part_num)
341 .max_push_part_num(op.max_push_part_num)
342 .journal_entries_add(
343 std::move(op.journal_entries_add))
344 .journal_entries_rm(
345 std::move(op.journal_entries_rm));
346
347 auto err = header.apply_update(u);
348 if (err) {
349 std::ostringstream ss;
350 ss << u;
351 CLS_ERR("%s: %s: %s", __PRETTY_FUNCTION__, err->c_str(),
352 ss.str().c_str());
353 return -EINVAL;
354 }
355
356 r = write_header(hctx, header);
357 if (r < 0) {
358 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
359 return r;
360 }
361
362 return 0;
363 }
364
365 int get_meta(cls_method_context_t hctx, ceph::buffer::list* in,
366 ceph::buffer::list* out)
367 {
368 CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
369
370 op::get_meta op;
371 try {
372 auto iter = in->cbegin();
373 decode(op, iter);
374 } catch (const ceph::buffer::error &err) {
375 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
376 return -EINVAL;
377 }
378
379 op::get_meta_reply reply;
380 int r = read_header(hctx, op.version, &reply.info, true);
381 if (r < 0) {
382 return r;
383 }
384
385 reply.part_header_size = CLS_FIFO_MAX_PART_HEADER_SIZE;
386 reply.part_entry_overhead = part_entry_overhead;
387
388 encode(reply, *out);
389
390 return 0;
391 }
392
393 int init_part(cls_method_context_t hctx, ceph::buffer::list* in,
394 ceph::buffer::list *out)
395 {
396 CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
397
398 op::init_part op;
399 try {
400 auto iter = in->cbegin();
401 decode(op, iter);
402 } catch (const ceph::buffer::error &err) {
403 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
404 return -EINVAL;
405 }
406
407 std::uint64_t size;
408
409 if (op.tag.empty()) {
410 CLS_ERR("%s: tag required", __PRETTY_FUNCTION__);
411 return -EINVAL;
412 }
413
414 int r = cls_cxx_stat2(hctx, &size, nullptr);
415 if (r < 0 && r != -ENOENT) {
416 CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__, r);
417 return r;
418 }
419 if (r == 0 && size > 0) {
420 part_header part_header;
421 r = read_part_header(hctx, &part_header);
422 if (r < 0) {
423 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
424 return r;
425 }
426
427 if (!(part_header.tag == op.tag &&
428 part_header.params == op.params)) {
429 CLS_ERR("%s: failed to re-create existing part with different "
430 "params", __PRETTY_FUNCTION__);
431 return -EEXIST;
432 }
433
434 return 0; /* already exists */
435 }
436
437 part_header part_header;
438
439 part_header.tag = op.tag;
440 part_header.params = op.params;
441
442 part_header.min_ofs = CLS_FIFO_MAX_PART_HEADER_SIZE;
443 part_header.last_ofs = 0;
444 part_header.next_ofs = part_header.min_ofs;
445 part_header.max_time = ceph::real_clock::now();
446
447 cls_gen_random_bytes(reinterpret_cast<char *>(&part_header.magic),
448 sizeof(part_header.magic));
449
450 r = write_part_header(hctx, part_header);
451 if (r < 0) {
452 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
453 return r;
454 }
455
456 return 0;
457 }
458
459 bool full_part(const part_header& part_header)
460 {
461 return (part_header.next_ofs > part_header.params.full_size_threshold);
462 }
463
464 int push_part(cls_method_context_t hctx, ceph::buffer::list* in,
465 ceph::buffer::list* out)
466 {
467 CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
468
469 op::push_part op;
470 try {
471 auto iter = in->cbegin();
472 decode(op, iter);
473 } catch (const ceph::buffer::error& err) {
474 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
475 return -EINVAL;
476 }
477
478 if (op.tag.empty()) {
479 CLS_ERR("%s: tag required", __PRETTY_FUNCTION__);
480 return -EINVAL;
481 }
482
483 part_header part_header;
484 int r = read_part_header(hctx, &part_header);
485 if (r < 0) {
486 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
487 return r;
488 }
489
490 if (!(part_header.tag == op.tag)) {
491 CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
492 return -EINVAL;
493 }
494
495 std::uint64_t effective_len = op.total_len + op.data_bufs.size() *
496 part_entry_overhead;
497
498 if (effective_len > part_header.params.max_part_size) {
499 return -EINVAL;
500 }
501
502 if (full_part(part_header)) {
503 return -ERANGE;
504 }
505
506 auto now = ceph::real_clock::now();
507 struct entry_header entry_header = { now };
508 ceph::buffer::list entry_header_bl;
509 encode(entry_header, entry_header_bl);
510
511 auto max_index = part_header.max_index;
512 const auto write_ofs = part_header.next_ofs;
513 auto ofs = part_header.next_ofs;
514
515 entry_header_pre pre_header;
516 pre_header.magic = part_header.magic;
517 pre_header.pre_size = sizeof(pre_header);
518 pre_header.reserved = 0;
519
520 std::uint64_t total_data = 0;
521 for (auto& data : op.data_bufs) {
522 total_data += data.length();
523 }
524 if (total_data != op.total_len) {
525 CLS_ERR("%s: length mismatch: op.total_len=%" PRId64
526 " total data received=%" PRId64,
527 __PRETTY_FUNCTION__, op.total_len, total_data);
528 return -EINVAL;
529 }
530
531
532 int entries_pushed = 0;
533 ceph::buffer::list all_data;
534 for (auto& data : op.data_bufs) {
535 if (full_part(part_header))
536 break;
537
538 pre_header.header_size = entry_header_bl.length();
539 pre_header.data_size = data.length();
540 pre_header.index = max_index;
541
542 bufferptr pre(reinterpret_cast<char*>(&pre_header), sizeof(pre_header));
543 auto entry_write_len = pre.length() + entry_header_bl.length() + data.length();
544 all_data.append(pre);
545 all_data.append(entry_header_bl);
546 all_data.claim_append(data);
547
548 part_header.last_ofs = ofs;
549 ofs += entry_write_len;
550 ++max_index;
551 ++entries_pushed;
552 part_header.max_index = max_index;
553 part_header.next_ofs = ofs;
554 }
555 part_header.max_time = now;
556
557 auto write_len = all_data.length();
558
559 r = cls_cxx_write2(hctx, write_ofs, write_len,
560 &all_data, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
561
562 if (r < 0) {
563 CLS_ERR("%s: failed to write entries (ofs=%" PRIu64
564 " len=%u): r=%d", __PRETTY_FUNCTION__, write_ofs,
565 write_len, r);
566 return r;
567 }
568
569
570 r = write_part_header(hctx, part_header);
571 if (r < 0) {
572 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
573 return r;
574 }
575
576 if (entries_pushed == 0) {
577 CLS_ERR("%s: pushed no entries? Can't happen!", __PRETTY_FUNCTION__);
578 return -EFAULT;
579 }
580
581 return entries_pushed;
582 }
583
584 class EntryReader {
585 static constexpr std::uint64_t prefetch_len = (128 * 1024);
586
587 cls_method_context_t hctx;
588
589 const fifo::part_header& part_header;
590
591 std::uint64_t ofs;
592 ceph::buffer::list data;
593
594 int fetch(std::uint64_t num_bytes);
595 int read(std::uint64_t num_bytes, ceph::buffer::list* pbl);
596 int peek(std::uint64_t num_bytes, char *dest);
597 int seek(std::uint64_t num_bytes);
598
599 public:
600 EntryReader(cls_method_context_t hctx,
601 const fifo::part_header& part_header,
602 uint64_t ofs) : hctx(hctx),
603 part_header(part_header),
604 ofs(ofs < part_header.min_ofs ?
605 part_header.min_ofs :
606 ofs) {}
607
608 std::uint64_t get_ofs() const {
609 return ofs;
610 }
611
612 bool end() const {
613 return (ofs >= part_header.next_ofs);
614 }
615
616 int peek_pre_header(entry_header_pre* pre_header);
617 int get_next_entry(ceph::buffer::list* pbl,
618 std::uint64_t* pofs,
619 ceph::real_time* pmtime);
620 };
621
622
623 int EntryReader::fetch(std::uint64_t num_bytes)
624 {
625 CLS_LOG(5, "%s: fetch %d bytes, ofs=%d data.length()=%d", __PRETTY_FUNCTION__, (int)num_bytes, (int)ofs, (int)data.length());
626 if (data.length() < num_bytes) {
627 ceph::buffer::list bl;
628 CLS_LOG(5, "%s: reading % " PRId64 " bytes at ofs=%" PRId64, __PRETTY_FUNCTION__,
629 prefetch_len, ofs + data.length());
630 int r = cls_cxx_read2(hctx, ofs + data.length(), prefetch_len, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
631 if (r < 0) {
632 CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__, r);
633 return r;
634 }
635 data.claim_append(bl);
636 }
637
638 if (static_cast<unsigned>(num_bytes) > data.length()) {
639 CLS_ERR("%s: requested %" PRId64 " bytes, but only "
640 "%u were available", __PRETTY_FUNCTION__, num_bytes, data.length());
641 return -ERANGE;
642 }
643
644 return 0;
645 }
646
647 int EntryReader::read(std::uint64_t num_bytes, ceph::buffer::list* pbl)
648 {
649 int r = fetch(num_bytes);
650 if (r < 0) {
651 return r;
652 }
653 data.splice(0, num_bytes, pbl);
654
655 ofs += num_bytes;
656
657 return 0;
658 }
659
660 int EntryReader::peek(std::uint64_t num_bytes, char* dest)
661 {
662 int r = fetch(num_bytes);
663 if (r < 0) {
664 return r;
665 }
666
667 data.begin().copy(num_bytes, dest);
668
669 return 0;
670 }
671
672 int EntryReader::seek(std::uint64_t num_bytes)
673 {
674 ceph::buffer::list bl;
675
676 CLS_LOG(5, "%s:%d: num_bytes=%" PRIu64, __PRETTY_FUNCTION__, __LINE__, num_bytes);
677 return read(num_bytes, &bl);
678 }
679
680 int EntryReader::peek_pre_header(entry_header_pre* pre_header)
681 {
682 if (end()) {
683 return -ENOENT;
684 }
685
686 int r = peek(sizeof(*pre_header),
687 reinterpret_cast<char*>(pre_header));
688 if (r < 0) {
689 CLS_ERR("ERROR: %s: peek() size=%zu failed: r=%d", __PRETTY_FUNCTION__,
690 sizeof(pre_header), r);
691 return r;
692 }
693
694 if (pre_header->magic != part_header.magic) {
695 CLS_ERR("ERROR: %s: unexpected pre_header magic", __PRETTY_FUNCTION__);
696 return -ERANGE;
697 }
698
699 return 0;
700 }
701
702
703 int EntryReader::get_next_entry(ceph::buffer::list* pbl,
704 std::uint64_t* pofs,
705 ceph::real_time* pmtime)
706 {
707 entry_header_pre pre_header;
708 int r = peek_pre_header(&pre_header);
709 if (r < 0) {
710 CLS_ERR("ERROR: %s: peek_pre_header() failed: r=%d", __PRETTY_FUNCTION__, r);
711 return r;
712 }
713
714 if (pofs) {
715 *pofs = ofs;
716 }
717
718 CLS_LOG(5, "%s:%d: pre_header.pre_size=%" PRIu64, __PRETTY_FUNCTION__, __LINE__,
719 uint64_t(pre_header.pre_size));
720 r = seek(pre_header.pre_size);
721 if (r < 0) {
722 CLS_ERR("ERROR: %s: failed to seek: r=%d", __PRETTY_FUNCTION__, r);
723 return r;
724 }
725
726 ceph::buffer::list header;
727 CLS_LOG(5, "%s:%d: pre_header.header_size=%d", __PRETTY_FUNCTION__, __LINE__, (int)pre_header.header_size);
728 r = read(pre_header.header_size, &header);
729 if (r < 0) {
730 CLS_ERR("ERROR: %s: failed to read entry header: r=%d", __PRETTY_FUNCTION__, r);
731 return r;
732 }
733
734 entry_header entry_header;
735 auto iter = header.cbegin();
736 try {
737 decode(entry_header, iter);
738 } catch (ceph::buffer::error& err) {
739 CLS_ERR("%s: failed decoding entry header", __PRETTY_FUNCTION__);
740 return -EIO;
741 }
742
743 if (pmtime) {
744 *pmtime = entry_header.mtime;
745 }
746
747 if (pbl) {
748 r = read(pre_header.data_size, pbl);
749 if (r < 0) {
750 CLS_ERR("%s: failed reading data: r=%d", __PRETTY_FUNCTION__, r);
751 return r;
752 }
753 } else {
754 r = seek(pre_header.data_size);
755 if (r < 0) {
756 CLS_ERR("ERROR: %s: failed to seek: r=%d", __PRETTY_FUNCTION__, r);
757 return r;
758 }
759 }
760
761 return 0;
762 }
763
764 int trim_part(cls_method_context_t hctx,
765 ceph::buffer::list *in, ceph::buffer::list *out)
766 {
767 CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
768
769 op::trim_part op;
770 try {
771 auto iter = in->cbegin();
772 decode(op, iter);
773 } catch (const ceph::buffer::error &err) {
774 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
775 return -EINVAL;
776 }
777
778 part_header part_header;
779 int r = read_part_header(hctx, &part_header);
780 if (r < 0) {
781 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
782 return r;
783 }
784
785 if (op.tag &&
786 !(part_header.tag == *op.tag)) {
787 CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
788 return -EINVAL;
789 }
790
791 if (op.ofs < part_header.min_ofs) {
792 return 0;
793 }
794 if (op.exclusive && op.ofs == part_header.min_ofs) {
795 return 0;
796 }
797
798 if (op.ofs >= part_header.next_ofs) {
799 if (full_part(part_header)) {
800 /*
801 * trim full part completely: remove object
802 */
803
804 r = cls_cxx_remove(hctx);
805 if (r < 0) {
806 CLS_ERR("%s: ERROR: cls_cxx_remove() returned r=%d", __PRETTY_FUNCTION__, r);
807 return r;
808 }
809
810 return 0;
811 }
812
813 part_header.min_ofs = part_header.next_ofs;
814 part_header.min_index = part_header.max_index;
815 } else {
816 EntryReader reader(hctx, part_header, op.ofs);
817
818 entry_header_pre pre_header;
819 int r = reader.peek_pre_header(&pre_header);
820 if (r < 0) {
821 return r;
822 }
823
824 if (op.exclusive) {
825 part_header.min_index = pre_header.index;
826 } else {
827 r = reader.get_next_entry(nullptr, nullptr, nullptr);
828 if (r < 0) {
829 CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d",
830 __PRETTY_FUNCTION__, r);
831 return r;
832 }
833 part_header.min_index = pre_header.index + 1;
834 }
835
836 part_header.min_ofs = reader.get_ofs();
837 }
838
839 r = write_part_header(hctx, part_header);
840 if (r < 0) {
841 CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
842 return r;
843 }
844
845 return 0;
846 }
847
848 int list_part(cls_method_context_t hctx, ceph::buffer::list* in,
849 ceph::buffer::list* out)
850 {
851 CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
852
853 op::list_part op;
854 try {
855 auto iter = in->cbegin();
856 decode(op, iter);
857 } catch (const buffer::error &err) {
858 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
859 return -EINVAL;
860 }
861
862 part_header part_header;
863 int r = read_part_header(hctx, &part_header);
864 if (r < 0) {
865 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
866 return r;
867 }
868
869 if (op.tag &&
870 !(part_header.tag == *op.tag)) {
871 CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
872 return -EINVAL;
873 }
874
875 EntryReader reader(hctx, part_header, op.ofs);
876
877 if (op.ofs >= part_header.min_ofs &&
878 !reader.end()) {
879 r = reader.get_next_entry(nullptr, nullptr, nullptr);
880 if (r < 0) {
881 CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d", __PRETTY_FUNCTION__, r);
882 return r;
883 }
884 }
885
886 op::list_part_reply reply;
887
888 reply.tag = part_header.tag;
889
890 auto max_entries = std::min(op.max_entries, op::MAX_LIST_ENTRIES);
891
892 for (int i = 0; i < max_entries && !reader.end(); ++i) {
893 ceph::buffer::list data;
894 ceph::real_time mtime;
895 std::uint64_t ofs;
896
897 r = reader.get_next_entry(&data, &ofs, &mtime);
898 if (r < 0) {
899 CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d",
900 __PRETTY_FUNCTION__, r);
901 return r;
902 }
903
904 reply.entries.emplace_back(std::move(data), ofs, mtime);
905 }
906
907 reply.more = !reader.end();
908 reply.full_part = full_part(part_header);
909
910 encode(reply, *out);
911
912 return 0;
913 }
914
915 int get_part_info(cls_method_context_t hctx, ceph::buffer::list *in,
916 ceph::buffer::list *out)
917 {
918 CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
919
920 op::get_part_info op;
921 try {
922 auto iter = in->cbegin();
923 decode(op, iter);
924 } catch (const ceph::buffer::error &err) {
925 CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
926 return -EINVAL;
927 }
928
929 op::get_part_info_reply reply;
930
931 int r = read_part_header(hctx, &reply.header);
932 if (r < 0) {
933 CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
934 return r;
935 }
936
937 encode(reply, *out);
938
939 return 0;
940 }
941 }
942 } // namespace rados::cls::fifo
943
944 CLS_INIT(fifo)
945 {
946 using namespace rados::cls::fifo;
947 CLS_LOG(10, "Loaded fifo class!");
948
949 cls_handle_t h_class;
950 cls_method_handle_t h_create_meta;
951 cls_method_handle_t h_get_meta;
952 cls_method_handle_t h_update_meta;
953 cls_method_handle_t h_init_part;
954 cls_method_handle_t h_push_part;
955 cls_method_handle_t h_trim_part;
956 cls_method_handle_t h_list_part;
957 cls_method_handle_t h_get_part_info;
958
959 cls_register(op::CLASS, &h_class);
960 cls_register_cxx_method(h_class, op::CREATE_META,
961 CLS_METHOD_RD | CLS_METHOD_WR,
962 create_meta, &h_create_meta);
963
964 cls_register_cxx_method(h_class, op::GET_META,
965 CLS_METHOD_RD,
966 get_meta, &h_get_meta);
967
968 cls_register_cxx_method(h_class, op::UPDATE_META,
969 CLS_METHOD_RD | CLS_METHOD_WR,
970 update_meta, &h_update_meta);
971
972 cls_register_cxx_method(h_class, op::INIT_PART,
973 CLS_METHOD_RD | CLS_METHOD_WR,
974 init_part, &h_init_part);
975
976 cls_register_cxx_method(h_class, op::PUSH_PART,
977 CLS_METHOD_RD | CLS_METHOD_WR,
978 push_part, &h_push_part);
979
980 cls_register_cxx_method(h_class, op::TRIM_PART,
981 CLS_METHOD_RD | CLS_METHOD_WR,
982 trim_part, &h_trim_part);
983
984 cls_register_cxx_method(h_class, op::LIST_PART,
985 CLS_METHOD_RD,
986 list_part, &h_list_part);
987
988 cls_register_cxx_method(h_class, op::GET_PART_INFO,
989 CLS_METHOD_RD,
990 get_part_info, &h_get_part_info);
991
992 /* calculate entry overhead */
993 struct entry_header entry_header;
994 ceph::buffer::list entry_header_bl;
995 encode(entry_header, entry_header_bl);
996
997 part_entry_overhead = sizeof(entry_header_pre) + entry_header_bl.length();
998
999 return;
1000 }