]> git.proxmox.com Git - ceph.git/blame - ceph/src/cls/journal/cls_journal.cc
update sources to v12.2.3
[ceph.git] / ceph / src / cls / journal / cls_journal.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/int_types.h"
5#include "include/buffer.h"
6#include "include/encoding.h"
7#include "common/errno.h"
8#include "objclass/objclass.h"
9#include "cls/journal/cls_journal_types.h"
10#include <errno.h>
11#include <map>
12#include <string>
13#include <sstream>
14
15CLS_VER(1, 0)
16CLS_NAME(journal)
17
18namespace {
19
20static const uint64_t MAX_KEYS_READ = 64;
21
22static const std::string HEADER_KEY_ORDER = "order";
23static const std::string HEADER_KEY_SPLAY_WIDTH = "splay_width";
24static const std::string HEADER_KEY_POOL_ID = "pool_id";
25static const std::string HEADER_KEY_MINIMUM_SET = "minimum_set";
26static const std::string HEADER_KEY_ACTIVE_SET = "active_set";
27static const std::string HEADER_KEY_NEXT_TAG_TID = "next_tag_tid";
28static const std::string HEADER_KEY_NEXT_TAG_CLASS = "next_tag_class";
29static const std::string HEADER_KEY_CLIENT_PREFIX = "client_";
30static const std::string HEADER_KEY_TAG_PREFIX = "tag_";
31
32std::string to_hex(uint64_t value) {
33 std::ostringstream oss;
34 oss << std::setw(16) << std::setfill('0') << std::hex << value;
35 return oss.str();
36}
37
38std::string key_from_client_id(const std::string &client_id) {
39 return HEADER_KEY_CLIENT_PREFIX + client_id;
40}
41
42std::string key_from_tag_tid(uint64_t tag_tid) {
43 return HEADER_KEY_TAG_PREFIX + to_hex(tag_tid);
44}
45
46uint64_t tag_tid_from_key(const std::string &key) {
47 std::istringstream iss(key);
48 uint64_t id;
49 iss.ignore(HEADER_KEY_TAG_PREFIX.size()) >> std::hex >> id;
50 return id;
51}
52
53template <typename T>
54int read_key(cls_method_context_t hctx, const string &key, T *t,
55 bool ignore_enoent = false) {
56 bufferlist bl;
57 int r = cls_cxx_map_get_val(hctx, key, &bl);
58 if (r == -ENOENT && ignore_enoent) {
59 return 0;
60 } else if (r < 0) {
61 CLS_ERR("failed to get omap key: %s", key.c_str());
62 return r;
63 }
64
65 try {
66 bufferlist::iterator iter = bl.begin();
67 ::decode(*t, iter);
68 } catch (const buffer::error &err) {
69 CLS_ERR("failed to decode input parameters: %s", err.what());
70 return -EINVAL;
71 }
72 return 0;
73}
74
75template <typename T>
76int write_key(cls_method_context_t hctx, const string &key, const T &t) {
77 bufferlist bl;
78 ::encode(t, bl);
79
80 int r = cls_cxx_map_set_val(hctx, key, &bl);
81 if (r < 0) {
82 CLS_ERR("failed to set omap key: %s", key.c_str());
83 return r;
84 }
85 return 0;
86}
87
88int remove_key(cls_method_context_t hctx, const string &key) {
89 int r = cls_cxx_map_remove_key(hctx, key);
90 if (r < 0 && r != -ENOENT) {
91 CLS_ERR("failed to remove key: %s", key.c_str());
92 return r;
93 }
94 return 0;
95}
96
97int expire_tags(cls_method_context_t hctx, const std::string *skip_client_id) {
98
99 std::string skip_client_key;
100 if (skip_client_id != nullptr) {
101 skip_client_key = key_from_client_id(*skip_client_id);
102 }
103
7c673cae 104 uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
b32b8144 105 std::string last_read = "";
c07f9fc5 106 bool more;
7c673cae
FG
107 do {
108 std::map<std::string, bufferlist> vals;
c07f9fc5
FG
109 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
110 MAX_KEYS_READ, &vals, &more);
7c673cae
FG
111 if (r < 0 && r != -ENOENT) {
112 CLS_ERR("failed to retrieve registered clients: %s",
113 cpp_strerror(r).c_str());
114 return r;
115 }
116
117 for (auto &val : vals) {
118 // if we are removing a client, skip its commit positions
119 if (val.first == skip_client_key) {
120 continue;
121 }
122
123 cls::journal::Client client;
124 bufferlist::iterator iter = val.second.begin();
125 try {
126 ::decode(client, iter);
127 } catch (const buffer::error &err) {
128 CLS_ERR("error decoding registered client: %s",
129 val.first.c_str());
130 return -EIO;
131 }
132
b32b8144
FG
133 if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) {
134 // don't allow a disconnected client to prevent pruning
135 continue;
136 } else if (client.commit_position.object_positions.empty()) {
137 // cannot prune if one or more clients has an empty commit history
138 return 0;
139 }
140
7c673cae
FG
141 for (auto object_position : client.commit_position.object_positions) {
142 minimum_tag_tid = MIN(minimum_tag_tid, object_position.tag_tid);
143 }
144 }
145 if (!vals.empty()) {
146 last_read = vals.rbegin()->first;
147 }
c07f9fc5 148 } while (more);
7c673cae
FG
149
150 // cannot expire tags if a client hasn't committed yet
151 if (minimum_tag_tid == std::numeric_limits<uint64_t>::max()) {
152 return 0;
153 }
154
155 // compute the minimum in-use tag for each class
156 std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
157 typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
158 TAG_PASS_SCRUB,
159 TAG_PASS_DONE } TagPass;
160 int tag_pass = TAG_PASS_CALCULATE_MINIMUMS;
161 last_read = HEADER_KEY_TAG_PREFIX;
162 do {
163 std::map<std::string, bufferlist> vals;
c07f9fc5
FG
164 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
165 MAX_KEYS_READ, &vals, &more);
7c673cae
FG
166 if (r < 0 && r != -ENOENT) {
167 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
168 return r;
169 }
170
171 for (auto &val : vals) {
172 cls::journal::Tag tag;
173 bufferlist::iterator iter = val.second.begin();
174 try {
175 ::decode(tag, iter);
176 } catch (const buffer::error &err) {
177 CLS_ERR("error decoding tag: %s", val.first.c_str());
178 return -EIO;
179 }
180
181 if (tag.tid != tag_tid_from_key(val.first)) {
182 CLS_ERR("tag tid mismatched: %s", val.first.c_str());
183 return -EINVAL;
184 }
185
186 if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
187 minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
188 } else if (tag_pass == TAG_PASS_SCRUB &&
189 tag.tid < minimum_tag_class_to_tids[tag.tag_class]) {
190 r = remove_key(hctx, val.first);
191 if (r < 0) {
192 return r;
193 }
194 }
195
196 if (tag.tid >= minimum_tag_tid) {
197 // no need to check for tag classes beyond this point
198 vals.clear();
3efd9988 199 more = false;
7c673cae
FG
200 break;
201 }
202 }
203
c07f9fc5 204 if (tag_pass != TAG_PASS_DONE && !more) {
7c673cae
FG
205 last_read = HEADER_KEY_TAG_PREFIX;
206 ++tag_pass;
207 } else if (!vals.empty()) {
208 last_read = vals.rbegin()->first;
209 }
210 } while (tag_pass != TAG_PASS_DONE);
211 return 0;
212}
213
214int get_client_list_range(cls_method_context_t hctx,
215 std::set<cls::journal::Client> *clients,
216 std::string start_after, uint64_t max_return) {
217 std::string last_read;
218 if (!start_after.empty()) {
219 last_read = key_from_client_id(start_after);
220 }
221
222 std::map<std::string, bufferlist> vals;
c07f9fc5 223 bool more;
7c673cae 224 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
c07f9fc5 225 max_return, &vals, &more);
7c673cae
FG
226 if (r < 0) {
227 CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r).c_str());
228 return r;
229 }
230
231 for (std::map<std::string, bufferlist>::iterator it = vals.begin();
232 it != vals.end(); ++it) {
233 try {
234 bufferlist::iterator iter = it->second.begin();
235
236 cls::journal::Client client;
237 ::decode(client, iter);
238 clients->insert(client);
239 } catch (const buffer::error &err) {
240 CLS_ERR("could not decode client '%s': %s", it->first.c_str(),
241 err.what());
242 return -EIO;
243 }
244 }
245
246 return 0;
247}
248
249int find_min_commit_position(cls_method_context_t hctx,
250 cls::journal::ObjectSetPosition *minset) {
251 int r;
252 bool valid = false;
253 std::string start_after = "";
254 uint64_t tag_tid = 0, entry_tid = 0;
255
256 while (true) {
257 std::set<cls::journal::Client> batch;
258
259 r = get_client_list_range(hctx, &batch, start_after, cls::journal::JOURNAL_MAX_RETURN);
260 if ((r < 0) || batch.empty()) {
261 break;
262 }
263
264 start_after = batch.rbegin()->id;
265
266 // update the (minimum) commit position from this batch of clients
267 for(std::set<cls::journal::Client>::iterator it = batch.begin();
268 it != batch.end(); ++it) {
269 cls::journal::ObjectSetPosition object_set_position = (*it).commit_position;
270 if (object_set_position.object_positions.empty()) {
271 *minset = cls::journal::ObjectSetPosition();
272 break;
273 }
274 cls::journal::ObjectPosition first = object_set_position.object_positions.front();
275
276 // least tag_tid (or least entry_tid for matching tag_tid)
277 if (!valid || (tag_tid > first.tag_tid) || ((tag_tid == first.tag_tid) && (entry_tid > first.entry_tid))) {
278 tag_tid = first.tag_tid;
279 entry_tid = first.entry_tid;
280 *minset = cls::journal::ObjectSetPosition(object_set_position);
281 valid = true;
282 }
283 }
284
285 // got the last batch, we're done
286 if (batch.size() < cls::journal::JOURNAL_MAX_RETURN) {
287 break;
288 }
289 }
290
291 return r;
292}
293
294} // anonymous namespace
295
296/**
297 * Input:
298 * @param order (uint8_t) - bits to shift to compute the object max size
299 * @param splay width (uint8_t) - number of active journal objects
300 *
301 * Output:
302 * @returns 0 on success, negative error code on failure
303 */
304int journal_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
305 uint8_t order;
306 uint8_t splay_width;
307 int64_t pool_id;
308 try {
309 bufferlist::iterator iter = in->begin();
310 ::decode(order, iter);
311 ::decode(splay_width, iter);
312 ::decode(pool_id, iter);
313 } catch (const buffer::error &err) {
314 CLS_ERR("failed to decode input parameters: %s", err.what());
315 return -EINVAL;
316 }
317
318 bufferlist stored_orderbl;
319 int r = cls_cxx_map_get_val(hctx, HEADER_KEY_ORDER, &stored_orderbl);
320 if (r >= 0) {
321 CLS_ERR("journal already exists");
322 return -EEXIST;
323 } else if (r != -ENOENT) {
324 return r;
325 }
326
327 r = write_key(hctx, HEADER_KEY_ORDER, order);
328 if (r < 0) {
329 return r;
330 }
331
332 r = write_key(hctx, HEADER_KEY_SPLAY_WIDTH, splay_width);
333 if (r < 0) {
334 return r;
335 }
336
337 r = write_key(hctx, HEADER_KEY_POOL_ID, pool_id);
338 if (r < 0) {
339 return r;
340 }
341
342 uint64_t object_set = 0;
343 r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
344 if (r < 0) {
345 return r;
346 }
347
348 r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
349 if (r < 0) {
350 return r;
351 }
352
353 uint64_t tag_id = 0;
354 r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_id);
355 if (r < 0) {
356 return r;
357 }
358
359 r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_id);
360 if (r < 0) {
361 return r;
362 }
363 return 0;
364}
365
366/**
367 * Input:
368 * none
369 *
370 * Output:
371 * order (uint8_t)
372 * @returns 0 on success, negative error code on failure
373 */
374int journal_get_order(cls_method_context_t hctx, bufferlist *in,
375 bufferlist *out) {
376 uint8_t order;
377 int r = read_key(hctx, HEADER_KEY_ORDER, &order);
378 if (r < 0) {
379 return r;
380 }
381
382 ::encode(order, *out);
383 return 0;
384}
385
386/**
387 * Input:
388 * none
389 *
390 * Output:
391 * order (uint8_t)
392 * @returns 0 on success, negative error code on failure
393 */
394int journal_get_splay_width(cls_method_context_t hctx, bufferlist *in,
395 bufferlist *out) {
396 uint8_t splay_width;
397 int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
398 if (r < 0) {
399 return r;
400 }
401
402 ::encode(splay_width, *out);
403 return 0;
404}
405
406/**
407 * Input:
408 * none
409 *
410 * Output:
411 * pool_id (int64_t)
412 * @returns 0 on success, negative error code on failure
413 */
414int journal_get_pool_id(cls_method_context_t hctx, bufferlist *in,
415 bufferlist *out) {
416 int64_t pool_id;
417 int r = read_key(hctx, HEADER_KEY_POOL_ID, &pool_id);
418 if (r < 0) {
419 return r;
420 }
421
422 ::encode(pool_id, *out);
423 return 0;
424}
425
426/**
427 * Input:
428 * none
429 *
430 * Output:
431 * object set (uint64_t)
432 * @returns 0 on success, negative error code on failure
433 */
434int journal_get_minimum_set(cls_method_context_t hctx, bufferlist *in,
435 bufferlist *out) {
436 uint64_t minimum_set;
437 int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &minimum_set);
438 if (r < 0) {
439 return r;
440 }
441
442 ::encode(minimum_set, *out);
443 return 0;
444}
445
446/**
447 * Input:
448 * @param object set (uint64_t)
449 *
450 * Output:
451 * @returns 0 on success, negative error code on failure
452 */
453int journal_set_minimum_set(cls_method_context_t hctx, bufferlist *in,
454 bufferlist *out) {
455 uint64_t object_set;
456 try {
457 bufferlist::iterator iter = in->begin();
458 ::decode(object_set, iter);
459 } catch (const buffer::error &err) {
460 CLS_ERR("failed to decode input parameters: %s", err.what());
461 return -EINVAL;
462 }
463
464 uint64_t current_active_set;
465 int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
466 if (r < 0) {
467 return r;
468 }
469
470 if (current_active_set < object_set) {
471 CLS_ERR("active object set earlier than minimum: %" PRIu64
472 " < %" PRIu64, current_active_set, object_set);
473 return -EINVAL;
474 }
475
476 uint64_t current_minimum_set;
477 r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
478 if (r < 0) {
479 return r;
480 }
481
482 if (object_set == current_minimum_set) {
483 return 0;
484 } else if (object_set < current_minimum_set) {
485 CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
486 object_set, current_minimum_set);
487 return -ESTALE;
488 }
489
490 r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
491 if (r < 0) {
492 return r;
493 }
494 return 0;
495}
496
497/**
498 * Input:
499 * none
500 *
501 * Output:
502 * object set (uint64_t)
503 * @returns 0 on success, negative error code on failure
504 */
505int journal_get_active_set(cls_method_context_t hctx, bufferlist *in,
506 bufferlist *out) {
507 uint64_t active_set;
508 int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &active_set);
509 if (r < 0) {
510 return r;
511 }
512
513 ::encode(active_set, *out);
514 return 0;
515}
516
517/**
518 * Input:
519 * @param object set (uint64_t)
520 *
521 * Output:
522 * @returns 0 on success, negative error code on failure
523 */
524int journal_set_active_set(cls_method_context_t hctx, bufferlist *in,
525 bufferlist *out) {
526 uint64_t object_set;
527 try {
528 bufferlist::iterator iter = in->begin();
529 ::decode(object_set, iter);
530 } catch (const buffer::error &err) {
531 CLS_ERR("failed to decode input parameters: %s", err.what());
532 return -EINVAL;
533 }
534
535 uint64_t current_minimum_set;
536 int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
537 if (r < 0) {
538 return r;
539 }
540
541 if (current_minimum_set > object_set) {
542 CLS_ERR("minimum object set later than active: %" PRIu64
543 " > %" PRIu64, current_minimum_set, object_set);
544 return -EINVAL;
545 }
546
547 uint64_t current_active_set;
548 r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
549 if (r < 0) {
550 return r;
551 }
552
553 if (object_set == current_active_set) {
554 return 0;
555 } else if (object_set < current_active_set) {
556 CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
557 object_set, current_active_set);
558 return -ESTALE;
559 }
560
561 r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
562 if (r < 0) {
563 return r;
564 }
565 return 0;
566}
567
568/**
569 * Input:
570 * @param id (string) - unique client id
571 *
572 * Output:
573 * cls::journal::Client
574 * @returns 0 on success, negative error code on failure
575 */
576int journal_get_client(cls_method_context_t hctx, bufferlist *in,
577 bufferlist *out) {
578 std::string id;
579 try {
580 bufferlist::iterator iter = in->begin();
581 ::decode(id, iter);
582 } catch (const buffer::error &err) {
583 CLS_ERR("failed to decode input parameters: %s", err.what());
584 return -EINVAL;
585 }
586
587 std::string key(key_from_client_id(id));
588 cls::journal::Client client;
589 int r = read_key(hctx, key, &client);
590 if (r < 0) {
591 return r;
592 }
593
594 ::encode(client, *out);
595 return 0;
596}
597
598/**
599 * Input:
600 * @param id (string) - unique client id
601 * @param data (bufferlist) - opaque data associated to client
602 *
603 * Output:
604 * @returns 0 on success, negative error code on failure
605 */
606int journal_client_register(cls_method_context_t hctx, bufferlist *in,
607 bufferlist *out) {
608 std::string id;
609 bufferlist data;
610 try {
611 bufferlist::iterator iter = in->begin();
612 ::decode(id, iter);
613 ::decode(data, iter);
614 } catch (const buffer::error &err) {
615 CLS_ERR("failed to decode input parameters: %s", err.what());
616 return -EINVAL;
617 }
618
619 uint8_t order;
620 int r = read_key(hctx, HEADER_KEY_ORDER, &order);
621 if (r < 0) {
622 return r;
623 }
624
625 std::string key(key_from_client_id(id));
626 bufferlist stored_clientbl;
627 r = cls_cxx_map_get_val(hctx, key, &stored_clientbl);
628 if (r >= 0) {
629 CLS_ERR("duplicate client id: %s", id.c_str());
630 return -EEXIST;
631 } else if (r != -ENOENT) {
632 return r;
633 }
634
635 cls::journal::ObjectSetPosition minset;
636 r = find_min_commit_position(hctx, &minset);
637 if (r < 0)
638 return r;
639
640 cls::journal::Client client(id, data, minset);
641 r = write_key(hctx, key, client);
642 if (r < 0) {
643 return r;
644 }
645 return 0;
646}
647
648/**
649 * Input:
650 * @param id (string) - unique client id
651 * @param data (bufferlist) - opaque data associated to client
652 *
653 * Output:
654 * @returns 0 on success, negative error code on failure
655 */
656int journal_client_update_data(cls_method_context_t hctx, bufferlist *in,
657 bufferlist *out) {
658 std::string id;
659 bufferlist data;
660 try {
661 bufferlist::iterator iter = in->begin();
662 ::decode(id, iter);
663 ::decode(data, iter);
664 } catch (const buffer::error &err) {
665 CLS_ERR("failed to decode input parameters: %s", err.what());
666 return -EINVAL;
667 }
668
669 std::string key(key_from_client_id(id));
670 cls::journal::Client client;
671 int r = read_key(hctx, key, &client);
672 if (r < 0) {
673 return r;
674 }
675
676 client.data = data;
677 r = write_key(hctx, key, client);
678 if (r < 0) {
679 return r;
680 }
681 return 0;
682}
683
684/**
685 * Input:
686 * @param id (string) - unique client id
687 * @param state (uint8_t) - client state
688 *
689 * Output:
690 * @returns 0 on success, negative error code on failure
691 */
692int journal_client_update_state(cls_method_context_t hctx, bufferlist *in,
693 bufferlist *out) {
694 std::string id;
695 cls::journal::ClientState state;
696 bufferlist data;
697 try {
698 bufferlist::iterator iter = in->begin();
699 ::decode(id, iter);
700 uint8_t state_raw;
701 ::decode(state_raw, iter);
702 state = static_cast<cls::journal::ClientState>(state_raw);
703 } catch (const buffer::error &err) {
704 CLS_ERR("failed to decode input parameters: %s", err.what());
705 return -EINVAL;
706 }
707
708 std::string key(key_from_client_id(id));
709 cls::journal::Client client;
710 int r = read_key(hctx, key, &client);
711 if (r < 0) {
712 return r;
713 }
714
715 client.state = state;
716 r = write_key(hctx, key, client);
717 if (r < 0) {
718 return r;
719 }
720 return 0;
721}
722
723/**
724 * Input:
725 * @param id (string) - unique client id
726 *
727 * Output:
728 * @returns 0 on success, negative error code on failure
729 */
730int journal_client_unregister(cls_method_context_t hctx, bufferlist *in,
731 bufferlist *out) {
732 std::string id;
733 try {
734 bufferlist::iterator iter = in->begin();
735 ::decode(id, iter);
736 } catch (const buffer::error &err) {
737 CLS_ERR("failed to decode input parameters: %s", err.what());
738 return -EINVAL;
739 }
740
741 std::string key(key_from_client_id(id));
742 bufferlist bl;
743 int r = cls_cxx_map_get_val(hctx, key, &bl);
744 if (r < 0) {
745 CLS_ERR("client is not registered: %s", id.c_str());
746 return r;
747 }
748
749 r = cls_cxx_map_remove_key(hctx, key);
750 if (r < 0) {
751 CLS_ERR("failed to remove omap key: %s", key.c_str());
752 return r;
753 }
754
755 // prune expired tags
756 r = expire_tags(hctx, &id);
757 if (r < 0) {
758 return r;
759 }
760 return 0;
761}
762
763/**
764 * Input:
765 * @param client_id (uint64_t) - unique client id
766 * @param commit_position (ObjectSetPosition)
767 *
768 * Output:
769 * @returns 0 on success, negative error code on failure
770 */
771int journal_client_commit(cls_method_context_t hctx, bufferlist *in,
772 bufferlist *out) {
773 std::string id;
774 cls::journal::ObjectSetPosition commit_position;
775 try {
776 bufferlist::iterator iter = in->begin();
777 ::decode(id, iter);
778 ::decode(commit_position, iter);
779 } catch (const buffer::error &err) {
780 CLS_ERR("failed to decode input parameters: %s", err.what());
781 return -EINVAL;
782 }
783
784 uint8_t splay_width;
785 int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
786 if (r < 0) {
787 return r;
788 }
789 if (commit_position.object_positions.size() > splay_width) {
790 CLS_ERR("too many object positions");
791 return -EINVAL;
792 }
793
794 std::string key(key_from_client_id(id));
795 cls::journal::Client client;
796 r = read_key(hctx, key, &client);
797 if (r < 0) {
798 return r;
799 }
800
801 if (client.commit_position == commit_position) {
802 return 0;
803 }
804
805 client.commit_position = commit_position;
806 r = write_key(hctx, key, client);
807 if (r < 0) {
808 return r;
809 }
810 return 0;
811}
812
813/**
814 * Input:
815 * @param start_after (string)
816 * @param max_return (uint64_t)
817 *
818 * Output:
819 * clients (set<cls::journal::Client>) - collection of registered clients
820 * @returns 0 on success, negative error code on failure
821 */
822int journal_client_list(cls_method_context_t hctx, bufferlist *in,
823 bufferlist *out) {
824 std::string start_after;
825 uint64_t max_return;
826 try {
827 bufferlist::iterator iter = in->begin();
828 ::decode(start_after, iter);
829 ::decode(max_return, iter);
830 } catch (const buffer::error &err) {
831 CLS_ERR("failed to decode input parameters: %s", err.what());
832 return -EINVAL;
833 }
834
835 std::set<cls::journal::Client> clients;
836 int r = get_client_list_range(hctx, &clients, start_after, max_return);
837 if (r < 0)
838 return r;
839
840 ::encode(clients, *out);
841 return 0;
842}
843
844/**
845 * Input:
846 * none
847 *
848 * Output:
849 * @returns 0 on success, negative error code on failure
850 */
851int journal_get_next_tag_tid(cls_method_context_t hctx, bufferlist *in,
852 bufferlist *out) {
853 uint64_t tag_tid;
854 int r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &tag_tid);
855 if (r < 0) {
856 return r;
857 }
858
859 ::encode(tag_tid, *out);
860 return 0;
861}
862
863/**
864 * Input:
865 * @param tag_tid (uint64_t)
866 *
867 * Output:
868 * cls::journal::Tag
869 * @returns 0 on success, negative error code on failure
870 */
871int journal_get_tag(cls_method_context_t hctx, bufferlist *in,
872 bufferlist *out) {
873 uint64_t tag_tid;
874 try {
875 bufferlist::iterator iter = in->begin();
876 ::decode(tag_tid, iter);
877 } catch (const buffer::error &err) {
878 CLS_ERR("failed to decode input parameters: %s", err.what());
879 return -EINVAL;
880 }
881
882 std::string key(key_from_tag_tid(tag_tid));
883 cls::journal::Tag tag;
884 int r = read_key(hctx, key, &tag);
885 if (r < 0) {
886 return r;
887 }
888
889 ::encode(tag, *out);
890 return 0;
891}
892
893/**
894 * Input:
895 * @param tag_tid (uint64_t)
896 * @param tag_class (uint64_t)
897 * @param data (bufferlist)
898 *
899 * Output:
900 * @returns 0 on success, negative error code on failure
901 */
902int journal_tag_create(cls_method_context_t hctx, bufferlist *in,
903 bufferlist *out) {
904 uint64_t tag_tid;
905 uint64_t tag_class;
906 bufferlist data;
907 try {
908 bufferlist::iterator iter = in->begin();
909 ::decode(tag_tid, iter);
910 ::decode(tag_class, iter);
911 ::decode(data, iter);
912 } catch (const buffer::error &err) {
913 CLS_ERR("failed to decode input parameters: %s", err.what());
914 return -EINVAL;
915 }
916
917 std::string key(key_from_tag_tid(tag_tid));
918 bufferlist stored_tag_bl;
919 int r = cls_cxx_map_get_val(hctx, key, &stored_tag_bl);
920 if (r >= 0) {
921 CLS_ERR("duplicate tag id: %" PRIu64, tag_tid);
922 return -EEXIST;
923 } else if (r != -ENOENT) {
924 return r;
925 }
926
927 // verify tag tid ordering
928 uint64_t next_tag_tid;
929 r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &next_tag_tid);
930 if (r < 0) {
931 return r;
932 }
933 if (tag_tid != next_tag_tid) {
934 CLS_LOG(5, "out-of-order tag sequence: %" PRIu64, tag_tid);
935 return -ESTALE;
936 }
937
938 uint64_t next_tag_class;
939 r = read_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, &next_tag_class);
940 if (r < 0) {
941 return r;
942 }
943
944 if (tag_class == cls::journal::Tag::TAG_CLASS_NEW) {
945 // allocate a new tag class
946 tag_class = next_tag_class;
947 r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_class + 1);
948 if (r < 0) {
949 return r;
950 }
951 } else {
952 // verify tag class range
953 if (tag_class >= next_tag_class) {
954 CLS_ERR("out-of-sequence tag class: %" PRIu64, tag_class);
955 return -EINVAL;
956 }
957 }
958
959 // prune expired tags
960 r = expire_tags(hctx, nullptr);
961 if (r < 0) {
962 return r;
963 }
964
965 // update tag tid sequence
966 r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_tid + 1);
967 if (r < 0) {
968 return r;
969 }
970
971 // write tag structure
972 cls::journal::Tag tag(tag_tid, tag_class, data);
973 key = key_from_tag_tid(tag_tid);
974 r = write_key(hctx, key, tag);
975 if (r < 0) {
976 return r;
977 }
978 return 0;
979}
980
981/**
982 * Input:
983 * @param start_after_tag_tid (uint64_t) - first tag tid
984 * @param max_return (uint64_t) - max tags to return
985 * @param client_id (std::string) - client id filter
986 * @param tag_class (boost::optional<uint64_t> - optional tag class filter
987 *
988 * Output:
989 * std::set<cls::journal::Tag> - collection of tags
990 * @returns 0 on success, negative error code on failure
991 */
992int journal_tag_list(cls_method_context_t hctx, bufferlist *in,
993 bufferlist *out) {
994 uint64_t start_after_tag_tid;
995 uint64_t max_return;
996 std::string client_id;
997 boost::optional<uint64_t> tag_class(0);
998
999 // handle compiler false positive about use-before-init
1000 tag_class = boost::none;
1001 try {
1002 bufferlist::iterator iter = in->begin();
1003 ::decode(start_after_tag_tid, iter);
1004 ::decode(max_return, iter);
1005 ::decode(client_id, iter);
1006 ::decode(tag_class, iter);
1007 } catch (const buffer::error &err) {
1008 CLS_ERR("failed to decode input parameters: %s", err.what());
1009 return -EINVAL;
1010 }
1011
1012 // calculate the minimum tag within client's commit position
1013 uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
1014 cls::journal::Client client;
1015 int r = read_key(hctx, key_from_client_id(client_id), &client);
1016 if (r < 0) {
1017 return r;
1018 }
1019
1020 for (auto object_position : client.commit_position.object_positions) {
1021 minimum_tag_tid = MIN(minimum_tag_tid, object_position.tag_tid);
1022 }
1023
1024 // compute minimum tags in use per-class
1025 std::set<cls::journal::Tag> tags;
1026 std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
1027 typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
1028 TAG_PASS_LIST,
1029 TAG_PASS_DONE } TagPass;
1030 int tag_pass = (minimum_tag_tid == std::numeric_limits<uint64_t>::max() ?
1031 TAG_PASS_LIST : TAG_PASS_CALCULATE_MINIMUMS);
1032 std::string last_read = HEADER_KEY_TAG_PREFIX;
1033 do {
1034 std::map<std::string, bufferlist> vals;
c07f9fc5 1035 bool more;
7c673cae 1036 r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
c07f9fc5 1037 MAX_KEYS_READ, &vals, &more);
7c673cae
FG
1038 if (r < 0 && r != -ENOENT) {
1039 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
1040 return r;
1041 }
1042
1043 for (auto &val : vals) {
1044 cls::journal::Tag tag;
1045 bufferlist::iterator iter = val.second.begin();
1046 try {
1047 ::decode(tag, iter);
1048 } catch (const buffer::error &err) {
1049 CLS_ERR("error decoding tag: %s", val.first.c_str());
1050 return -EIO;
1051 }
1052
1053 if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
1054 minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
1055
1056 // completed calculation of tag class minimums
1057 if (tag.tid >= minimum_tag_tid) {
1058 vals.clear();
3efd9988 1059 more = false;
7c673cae
FG
1060 break;
1061 }
1062 } else if (tag_pass == TAG_PASS_LIST) {
1063 if (start_after_tag_tid != 0 && tag.tid <= start_after_tag_tid) {
1064 continue;
1065 }
1066
1067 if (tag.tid >= minimum_tag_class_to_tids[tag.tag_class] &&
1068 (!tag_class || *tag_class == tag.tag_class)) {
1069 tags.insert(tag);
1070 }
1071 if (tags.size() >= max_return) {
1072 tag_pass = TAG_PASS_DONE;
1073 }
1074 }
1075 }
1076
c07f9fc5 1077 if (tag_pass != TAG_PASS_DONE && !more) {
7c673cae
FG
1078 last_read = HEADER_KEY_TAG_PREFIX;
1079 ++tag_pass;
1080 } else if (!vals.empty()) {
1081 last_read = vals.rbegin()->first;
1082 }
1083 } while (tag_pass != TAG_PASS_DONE);
1084
1085 ::encode(tags, *out);
1086 return 0;
1087}
1088
1089/**
1090 * Input:
1091 * @param soft_max_size (uint64_t)
1092 *
1093 * Output:
1094 * @returns 0 if object size less than max, negative error code otherwise
1095 */
1096int journal_object_guard_append(cls_method_context_t hctx, bufferlist *in,
1097 bufferlist *out) {
1098 uint64_t soft_max_size;
1099 try {
1100 bufferlist::iterator iter = in->begin();
1101 ::decode(soft_max_size, iter);
1102 } catch (const buffer::error &err) {
1103 CLS_ERR("failed to decode input parameters: %s", err.what());
1104 return -EINVAL;
1105 }
1106
1107 uint64_t size;
1108 time_t mtime;
1109 int r = cls_cxx_stat(hctx, &size, &mtime);
1110 if (r == -ENOENT) {
1111 return 0;
1112 } else if (r < 0) {
1113 CLS_ERR("failed to stat object: %s", cpp_strerror(r).c_str());
1114 return r;
1115 }
1116
1117 if (size >= soft_max_size) {
1118 CLS_LOG(5, "journal object full: %" PRIu64 " >= %" PRIu64,
1119 size, soft_max_size);
1120 return -EOVERFLOW;
1121 }
1122 return 0;
1123}
1124
1125CLS_INIT(journal)
1126{
1127 CLS_LOG(20, "Loaded journal class!");
1128
1129 cls_handle_t h_class;
1130 cls_method_handle_t h_journal_create;
1131 cls_method_handle_t h_journal_get_order;
1132 cls_method_handle_t h_journal_get_splay_width;
1133 cls_method_handle_t h_journal_get_pool_id;
1134 cls_method_handle_t h_journal_get_minimum_set;
1135 cls_method_handle_t h_journal_set_minimum_set;
1136 cls_method_handle_t h_journal_get_active_set;
1137 cls_method_handle_t h_journal_set_active_set;
1138 cls_method_handle_t h_journal_get_client;
1139 cls_method_handle_t h_journal_client_register;
1140 cls_method_handle_t h_journal_client_update_data;
1141 cls_method_handle_t h_journal_client_update_state;
1142 cls_method_handle_t h_journal_client_unregister;
1143 cls_method_handle_t h_journal_client_commit;
1144 cls_method_handle_t h_journal_client_list;
1145 cls_method_handle_t h_journal_get_next_tag_tid;
1146 cls_method_handle_t h_journal_get_tag;
1147 cls_method_handle_t h_journal_tag_create;
1148 cls_method_handle_t h_journal_tag_list;
1149 cls_method_handle_t h_journal_object_guard_append;
1150
1151 cls_register("journal", &h_class);
1152
1153 /// methods for journal.$journal_id objects
1154 cls_register_cxx_method(h_class, "create",
1155 CLS_METHOD_RD | CLS_METHOD_WR,
1156 journal_create, &h_journal_create);
1157 cls_register_cxx_method(h_class, "get_order",
1158 CLS_METHOD_RD,
1159 journal_get_order, &h_journal_get_order);
1160 cls_register_cxx_method(h_class, "get_splay_width",
1161 CLS_METHOD_RD,
1162 journal_get_splay_width, &h_journal_get_splay_width);
1163 cls_register_cxx_method(h_class, "get_pool_id",
1164 CLS_METHOD_RD,
1165 journal_get_pool_id, &h_journal_get_pool_id);
1166 cls_register_cxx_method(h_class, "get_minimum_set",
1167 CLS_METHOD_RD,
1168 journal_get_minimum_set,
1169 &h_journal_get_minimum_set);
1170 cls_register_cxx_method(h_class, "set_minimum_set",
1171 CLS_METHOD_RD | CLS_METHOD_WR,
1172 journal_set_minimum_set,
1173 &h_journal_set_minimum_set);
1174 cls_register_cxx_method(h_class, "get_active_set",
1175 CLS_METHOD_RD,
1176 journal_get_active_set,
1177 &h_journal_get_active_set);
1178 cls_register_cxx_method(h_class, "set_active_set",
1179 CLS_METHOD_RD | CLS_METHOD_WR,
1180 journal_set_active_set,
1181 &h_journal_set_active_set);
1182
1183 cls_register_cxx_method(h_class, "get_client",
1184 CLS_METHOD_RD,
1185 journal_get_client, &h_journal_get_client);
1186 cls_register_cxx_method(h_class, "client_register",
1187 CLS_METHOD_RD | CLS_METHOD_WR,
1188 journal_client_register, &h_journal_client_register);
1189 cls_register_cxx_method(h_class, "client_update_data",
1190 CLS_METHOD_RD | CLS_METHOD_WR,
1191 journal_client_update_data,
1192 &h_journal_client_update_data);
1193 cls_register_cxx_method(h_class, "client_update_state",
1194 CLS_METHOD_RD | CLS_METHOD_WR,
1195 journal_client_update_state,
1196 &h_journal_client_update_state);
1197 cls_register_cxx_method(h_class, "client_unregister",
1198 CLS_METHOD_RD | CLS_METHOD_WR,
1199 journal_client_unregister,
1200 &h_journal_client_unregister);
1201 cls_register_cxx_method(h_class, "client_commit",
1202 CLS_METHOD_RD | CLS_METHOD_WR,
1203 journal_client_commit, &h_journal_client_commit);
1204 cls_register_cxx_method(h_class, "client_list",
1205 CLS_METHOD_RD,
1206 journal_client_list, &h_journal_client_list);
1207
1208 cls_register_cxx_method(h_class, "get_next_tag_tid",
1209 CLS_METHOD_RD,
1210 journal_get_next_tag_tid,
1211 &h_journal_get_next_tag_tid);
1212 cls_register_cxx_method(h_class, "get_tag",
1213 CLS_METHOD_RD,
1214 journal_get_tag, &h_journal_get_tag);
1215 cls_register_cxx_method(h_class, "tag_create",
1216 CLS_METHOD_RD | CLS_METHOD_WR,
1217 journal_tag_create, &h_journal_tag_create);
1218 cls_register_cxx_method(h_class, "tag_list",
1219 CLS_METHOD_RD,
1220 journal_tag_list, &h_journal_tag_list);
1221
1222 /// methods for journal_data.$journal_id.$object_id objects
1223 cls_register_cxx_method(h_class, "guard_append",
1224 CLS_METHOD_RD | CLS_METHOD_WR,
1225 journal_object_guard_append,
1226 &h_journal_object_guard_append);
1227}