]> git.proxmox.com Git - ceph.git/blob - ceph/src/cls/fifo/cls_fifo_types.h
8a471828b7a5ac139b8876cebdc53d2aaa5822f0
[ceph.git] / ceph / src / cls / fifo / cls_fifo_types.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2019 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #pragma once
17
18 #include <cstdint>
19 #include <map>
20 #include <optional>
21 #include <ostream>
22 #include <string>
23 #include <vector>
24
25 #undef FMT_HEADER_ONLY
26 #define FMT_HEADER_ONLY 1
27 #include <fmt/format.h>
28
29 #include "include/buffer.h"
30 #include "include/encoding.h"
31 #include "include/types.h"
32
33 #include "common/ceph_time.h"
34
35 class JSONObj;
36
37 namespace rados::cls::fifo {
38 struct objv {
39 std::string instance;
40 std::uint64_t ver{0};
41
42 void encode(ceph::buffer::list& bl) const {
43 ENCODE_START(1, 1, bl);
44 encode(instance, bl);
45 encode(ver, bl);
46 ENCODE_FINISH(bl);
47 }
48 void decode(ceph::buffer::list::const_iterator& bl) {
49 DECODE_START(1, bl);
50 decode(instance, bl);
51 decode(ver, bl);
52 DECODE_FINISH(bl);
53 }
54 void dump(ceph::Formatter* f) const;
55 void decode_json(JSONObj* obj);
56
57 bool operator ==(const objv& rhs) const {
58 return (instance == rhs.instance &&
59 ver == rhs.ver);
60 }
61 bool operator !=(const objv& rhs) const {
62 return (instance != rhs.instance ||
63 ver != rhs.ver);
64 }
65 bool same_or_later(const objv& rhs) const {
66 return (instance == rhs.instance ||
67 ver >= rhs.ver);
68 }
69
70 bool empty() const {
71 return instance.empty();
72 }
73
74 std::string to_str() const {
75 return fmt::format("{}{{{}}}", instance, ver);
76 }
77 };
78 WRITE_CLASS_ENCODER(objv)
79 inline std::ostream& operator <<(std::ostream& os, const objv& objv)
80 {
81 return os << objv.to_str();
82 }
83
84 struct data_params {
85 std::uint64_t max_part_size{0};
86 std::uint64_t max_entry_size{0};
87 std::uint64_t full_size_threshold{0};
88
89 void encode(ceph::buffer::list& bl) const {
90 ENCODE_START(1, 1, bl);
91 encode(max_part_size, bl);
92 encode(max_entry_size, bl);
93 encode(full_size_threshold, bl);
94 ENCODE_FINISH(bl);
95 }
96 void decode(ceph::buffer::list::const_iterator& bl) {
97 DECODE_START(1, bl);
98 decode(max_part_size, bl);
99 decode(max_entry_size, bl);
100 decode(full_size_threshold, bl);
101 DECODE_FINISH(bl);
102 }
103 void dump(ceph::Formatter* f) const;
104 void decode_json(JSONObj* obj);
105
106 bool operator ==(const data_params& rhs) const {
107 return (max_part_size == rhs.max_part_size &&
108 max_entry_size == rhs.max_entry_size &&
109 full_size_threshold == rhs.full_size_threshold);
110 }
111 };
112 WRITE_CLASS_ENCODER(data_params)
113 inline std::ostream& operator <<(std::ostream& m, const data_params& d) {
114 return m << "max_part_size: " << d.max_part_size << ", "
115 << "max_entry_size: " << d.max_entry_size << ", "
116 << "full_size_threshold: " << d.full_size_threshold;
117 }
118
119 struct journal_entry {
120 enum class Op {
121 unknown = 0,
122 create = 1,
123 set_head = 2,
124 remove = 3,
125 } op{Op::unknown};
126
127 std::int64_t part_num{0};
128 std::string part_tag;
129
130 void encode(ceph::buffer::list& bl) const {
131 ENCODE_START(1, 1, bl);
132 encode((int)op, bl);
133 encode(part_num, bl);
134 encode(part_tag, bl);
135 ENCODE_FINISH(bl);
136 }
137 void decode(ceph::buffer::list::const_iterator& bl) {
138 DECODE_START(1, bl);
139 int i;
140 decode(i, bl);
141 op = static_cast<Op>(i);
142 decode(part_num, bl);
143 decode(part_tag, bl);
144 DECODE_FINISH(bl);
145 }
146 void dump(ceph::Formatter* f) const;
147
148 bool operator ==(const journal_entry& e) {
149 return (op == e.op &&
150 part_num == e.part_num &&
151 part_tag == e.part_tag);
152 }
153 };
154 WRITE_CLASS_ENCODER(journal_entry)
155 inline std::ostream& operator <<(std::ostream& m, const journal_entry::Op& o) {
156 switch (o) {
157 case journal_entry::Op::unknown:
158 return m << "Op::unknown";
159 case journal_entry::Op::create:
160 return m << "Op::create";
161 case journal_entry::Op::set_head:
162 return m << "Op::set_head";
163 case journal_entry::Op::remove:
164 return m << "Op::remove";
165 }
166 return m << "Bad value: " << static_cast<int>(o);
167 }
168 inline std::ostream& operator <<(std::ostream& m, const journal_entry& j) {
169 return m << "op: " << j.op << ", "
170 << "part_num: " << j.part_num << ", "
171 << "part_tag: " << j.part_tag;
172 }
173
174 // This is actually a useful builder, since otherwise we end up with
175 // four uint64_ts in a row and only care about a subset at a time.
176 class update {
177 std::optional<std::uint64_t> tail_part_num_;
178 std::optional<std::uint64_t> head_part_num_;
179 std::optional<std::uint64_t> min_push_part_num_;
180 std::optional<std::uint64_t> max_push_part_num_;
181 std::vector<fifo::journal_entry> journal_entries_add_;
182 std::vector<fifo::journal_entry> journal_entries_rm_;
183
184 public:
185
186 update&& tail_part_num(std::optional<std::uint64_t> num) noexcept {
187 tail_part_num_ = num;
188 return std::move(*this);
189 }
190 auto tail_part_num() const noexcept {
191 return tail_part_num_;
192 }
193
194 update&& head_part_num(std::optional<std::uint64_t> num) noexcept {
195 head_part_num_ = num;
196 return std::move(*this);
197 }
198 auto head_part_num() const noexcept {
199 return head_part_num_;
200 }
201
202 update&& min_push_part_num(std::optional<std::uint64_t> num)
203 noexcept {
204 min_push_part_num_ = num;
205 return std::move(*this);
206 }
207 auto min_push_part_num() const noexcept {
208 return min_push_part_num_;
209 }
210
211 update&& max_push_part_num(std::optional<std::uint64_t> num) noexcept {
212 max_push_part_num_ = num;
213 return std::move(*this);
214 }
215 auto max_push_part_num() const noexcept {
216 return max_push_part_num_;
217 }
218
219 update&& journal_entry_add(fifo::journal_entry entry) {
220 journal_entries_add_.push_back(std::move(entry));
221 return std::move(*this);
222 }
223 update&& journal_entries_add(
224 std::optional<std::vector<fifo::journal_entry>>&& entries) {
225 if (entries) {
226 journal_entries_add_ = std::move(*entries);
227 } else {
228 journal_entries_add_.clear();
229 }
230 return std::move(*this);
231 }
232 const auto& journal_entries_add() const & noexcept {
233 return journal_entries_add_;
234 }
235 auto&& journal_entries_add() && noexcept {
236 return std::move(journal_entries_add_);
237 }
238
239 update&& journal_entry_rm(fifo::journal_entry entry) {
240 journal_entries_rm_.push_back(std::move(entry));
241 return std::move(*this);
242 }
243 update&& journal_entries_rm(
244 std::optional<std::vector<fifo::journal_entry>>&& entries) {
245 if (entries) {
246 journal_entries_rm_ = std::move(*entries);
247 } else {
248 journal_entries_rm_.clear();
249 }
250 return std::move(*this);
251 }
252 const auto& journal_entries_rm() const & noexcept {
253 return journal_entries_rm_;
254 }
255 auto&& journal_entries_rm() && noexcept {
256 return std::move(journal_entries_rm_);
257 }
258 friend std::ostream& operator <<(std::ostream& m, const update& u);
259 };
260 inline std::ostream& operator <<(std::ostream& m, const update& u) {
261 bool prev = false;
262 if (u.tail_part_num_) {
263 m << "tail_part_num: " << *u.tail_part_num_;
264 prev = true;
265 }
266 if (u.head_part_num_) {
267 if (prev)
268 m << ", ";
269 m << "head_part_num: " << *u.head_part_num_;
270 prev = true;
271 }
272 if (u.min_push_part_num_) {
273 if (prev)
274 m << ", ";
275 m << "min_push_part_num: " << *u.min_push_part_num_;
276 prev = true;
277 }
278 if (u.max_push_part_num_) {
279 if (prev)
280 m << ", ";
281 m << "max_push_part_num: " << *u.max_push_part_num_;
282 prev = true;
283 }
284 if (!u.journal_entries_add_.empty()) {
285 if (prev)
286 m << ", ";
287 m << "journal_entries_add: {" << u.journal_entries_add_ << "}";
288 prev = true;
289 }
290 if (!u.journal_entries_rm_.empty()) {
291 if (prev)
292 m << ", ";
293 m << "journal_entries_rm: {" << u.journal_entries_rm_ << "}";
294 prev = true;
295 }
296 if (!prev)
297 m << "(none)";
298 return m;
299 }
300
301 struct info {
302 std::string id;
303 objv version;
304 std::string oid_prefix;
305 data_params params;
306
307 std::int64_t tail_part_num{0};
308 std::int64_t head_part_num{-1};
309 std::int64_t min_push_part_num{0};
310 std::int64_t max_push_part_num{-1};
311
312 std::string head_tag;
313 std::map<int64_t, std::string> tags;
314
315 std::multimap<int64_t, journal_entry> journal;
316
317 bool need_new_head() const {
318 return (head_part_num < min_push_part_num);
319 }
320
321 bool need_new_part() const {
322 return (max_push_part_num < min_push_part_num);
323 }
324
325 void encode(ceph::buffer::list& bl) const {
326 ENCODE_START(1, 1, bl);
327 encode(id, bl);
328 encode(version, bl);
329 encode(oid_prefix, bl);
330 encode(params, bl);
331 encode(tail_part_num, bl);
332 encode(head_part_num, bl);
333 encode(min_push_part_num, bl);
334 encode(max_push_part_num, bl);
335 encode(tags, bl);
336 encode(head_tag, bl);
337 encode(journal, bl);
338 ENCODE_FINISH(bl);
339 }
340 void decode(ceph::buffer::list::const_iterator& bl) {
341 DECODE_START(1, bl);
342 decode(id, bl);
343 decode(version, bl);
344 decode(oid_prefix, bl);
345 decode(params, bl);
346 decode(tail_part_num, bl);
347 decode(head_part_num, bl);
348 decode(min_push_part_num, bl);
349 decode(max_push_part_num, bl);
350 decode(tags, bl);
351 decode(head_tag, bl);
352 decode(journal, bl);
353 DECODE_FINISH(bl);
354 }
355 void dump(ceph::Formatter* f) const;
356 void decode_json(JSONObj* obj);
357
358 std::string part_oid(std::int64_t part_num) const {
359 return fmt::format("{}.{}", oid_prefix, part_num);
360 }
361
362 journal_entry next_journal_entry(std::string tag) const {
363 journal_entry entry;
364 entry.op = journal_entry::Op::create;
365 entry.part_num = max_push_part_num + 1;
366 entry.part_tag = std::move(tag);
367 return entry;
368 }
369
370 std::optional<std::string>
371 apply_update(const update& update) {
372 if (update.tail_part_num()) {
373 tail_part_num = *update.tail_part_num();
374 }
375
376 if (update.min_push_part_num()) {
377 min_push_part_num = *update.min_push_part_num();
378 }
379
380 if (update.max_push_part_num()) {
381 max_push_part_num = *update.max_push_part_num();
382 }
383
384 for (const auto& entry : update.journal_entries_add()) {
385 auto iter = journal.find(entry.part_num);
386 if (iter != journal.end() &&
387 iter->second.op == entry.op) {
388 /* don't allow multiple concurrent (same) operations on the same part,
389 racing clients should use objv to avoid races anyway */
390 return fmt::format("multiple concurrent operations on same part are not "
391 "allowed, part num={}", entry.part_num);
392 }
393
394 if (entry.op == journal_entry::Op::create) {
395 tags[entry.part_num] = entry.part_tag;
396 }
397
398 journal.emplace(entry.part_num, entry);
399 }
400
401 for (const auto& entry : update.journal_entries_rm()) {
402 journal.erase(entry.part_num);
403 }
404
405 if (update.head_part_num()) {
406 tags.erase(head_part_num);
407 head_part_num = *update.head_part_num();
408 auto iter = tags.find(head_part_num);
409 if (iter != tags.end()) {
410 head_tag = iter->second;
411 } else {
412 head_tag.erase();
413 }
414 }
415
416 return std::nullopt;
417 }
418 };
419 WRITE_CLASS_ENCODER(info)
420 inline std::ostream& operator <<(std::ostream& m, const info& i) {
421 return m << "id: " << i.id << ", "
422 << "version: " << i.version << ", "
423 << "oid_prefix: " << i.oid_prefix << ", "
424 << "params: {" << i.params << "}, "
425 << "tail_part_num: " << i.tail_part_num << ", "
426 << "head_part_num: " << i.head_part_num << ", "
427 << "min_push_part_num: " << i.min_push_part_num << ", "
428 << "max_push_part_num: " << i.max_push_part_num << ", "
429 << "head_tag: " << i.head_tag << ", "
430 << "tags: {" << i.tags << "}, "
431 << "journal: {" << i.journal;
432 }
433
434 struct part_list_entry {
435 ceph::buffer::list data;
436 std::uint64_t ofs = 0;
437 ceph::real_time mtime;
438
439 part_list_entry() {}
440 part_list_entry(ceph::buffer::list&& data,
441 uint64_t ofs,
442 ceph::real_time mtime)
443 : data(std::move(data)), ofs(ofs), mtime(mtime) {}
444
445
446 void encode(ceph::buffer::list& bl) const {
447 ENCODE_START(1, 1, bl);
448 encode(data, bl);
449 encode(ofs, bl);
450 encode(mtime, bl);
451 ENCODE_FINISH(bl);
452 }
453 void decode(ceph::buffer::list::const_iterator& bl) {
454 DECODE_START(1, bl);
455 decode(data, bl);
456 decode(ofs, bl);
457 decode(mtime, bl);
458 DECODE_FINISH(bl);
459 }
460 };
461 WRITE_CLASS_ENCODER(part_list_entry)
462 inline std::ostream& operator <<(std::ostream& m,
463 const part_list_entry& p) {
464 using ceph::operator <<;
465 return m << "data: " << p.data << ", "
466 << "ofs: " << p.ofs << ", "
467 << "mtime: " << p.mtime;
468 }
469
470 struct part_header {
471 std::string tag;
472
473 data_params params;
474
475 std::uint64_t magic{0};
476
477 std::uint64_t min_ofs{0};
478 std::uint64_t last_ofs{0};
479 std::uint64_t next_ofs{0};
480 std::uint64_t min_index{0};
481 std::uint64_t max_index{0};
482 ceph::real_time max_time;
483
484 void encode(ceph::buffer::list& bl) const {
485 ENCODE_START(1, 1, bl);
486 encode(tag, bl);
487 encode(params, bl);
488 encode(magic, bl);
489 encode(min_ofs, bl);
490 encode(last_ofs, bl);
491 encode(next_ofs, bl);
492 encode(min_index, bl);
493 encode(max_index, bl);
494 encode(max_time, bl);
495 ENCODE_FINISH(bl);
496 }
497 void decode(ceph::buffer::list::const_iterator& bl) {
498 DECODE_START(1, bl);
499 decode(tag, bl);
500 decode(params, bl);
501 decode(magic, bl);
502 decode(min_ofs, bl);
503 decode(last_ofs, bl);
504 decode(next_ofs, bl);
505 decode(min_index, bl);
506 decode(max_index, bl);
507 decode(max_time, bl);
508 DECODE_FINISH(bl);
509 }
510 };
511 WRITE_CLASS_ENCODER(part_header)
512 inline std::ostream& operator <<(std::ostream& m, const part_header& p) {
513 using ceph::operator <<;
514 return m << "tag: " << p.tag << ", "
515 << "params: {" << p.params << "}, "
516 << "magic: " << p.magic << ", "
517 << "min_ofs: " << p.min_ofs << ", "
518 << "last_ofs: " << p.last_ofs << ", "
519 << "next_ofs: " << p.next_ofs << ", "
520 << "min_index: " << p.min_index << ", "
521 << "max_index: " << p.max_index << ", "
522 << "max_time: " << p.max_time;
523 }
524 } // namespace rados::cls::fifo