]> git.proxmox.com Git - ceph.git/blame - ceph/src/cls/journal/cls_journal.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / cls / journal / cls_journal.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/int_types.h"
5#include "include/buffer.h"
6#include "include/encoding.h"
7#include "common/errno.h"
8#include "objclass/objclass.h"
9#include "cls/journal/cls_journal_types.h"
10#include <errno.h>
11#include <map>
12#include <string>
13#include <sstream>
14
15CLS_VER(1, 0)
16CLS_NAME(journal)
17
18namespace {
19
20static const uint64_t MAX_KEYS_READ = 64;
21
22static const std::string HEADER_KEY_ORDER = "order";
23static const std::string HEADER_KEY_SPLAY_WIDTH = "splay_width";
24static const std::string HEADER_KEY_POOL_ID = "pool_id";
25static const std::string HEADER_KEY_MINIMUM_SET = "minimum_set";
26static const std::string HEADER_KEY_ACTIVE_SET = "active_set";
27static const std::string HEADER_KEY_NEXT_TAG_TID = "next_tag_tid";
28static const std::string HEADER_KEY_NEXT_TAG_CLASS = "next_tag_class";
29static const std::string HEADER_KEY_CLIENT_PREFIX = "client_";
30static const std::string HEADER_KEY_TAG_PREFIX = "tag_";
31
32std::string to_hex(uint64_t value) {
33 std::ostringstream oss;
34 oss << std::setw(16) << std::setfill('0') << std::hex << value;
35 return oss.str();
36}
37
38std::string key_from_client_id(const std::string &client_id) {
39 return HEADER_KEY_CLIENT_PREFIX + client_id;
40}
41
42std::string key_from_tag_tid(uint64_t tag_tid) {
43 return HEADER_KEY_TAG_PREFIX + to_hex(tag_tid);
44}
45
46uint64_t tag_tid_from_key(const std::string &key) {
47 std::istringstream iss(key);
48 uint64_t id;
49 iss.ignore(HEADER_KEY_TAG_PREFIX.size()) >> std::hex >> id;
50 return id;
51}
52
53template <typename T>
54int read_key(cls_method_context_t hctx, const string &key, T *t,
55 bool ignore_enoent = false) {
56 bufferlist bl;
57 int r = cls_cxx_map_get_val(hctx, key, &bl);
58 if (r == -ENOENT && ignore_enoent) {
59 return 0;
60 } else if (r < 0) {
61 CLS_ERR("failed to get omap key: %s", key.c_str());
62 return r;
63 }
64
65 try {
66 bufferlist::iterator iter = bl.begin();
67 ::decode(*t, iter);
68 } catch (const buffer::error &err) {
69 CLS_ERR("failed to decode input parameters: %s", err.what());
70 return -EINVAL;
71 }
72 return 0;
73}
74
75template <typename T>
76int write_key(cls_method_context_t hctx, const string &key, const T &t) {
77 bufferlist bl;
78 ::encode(t, bl);
79
80 int r = cls_cxx_map_set_val(hctx, key, &bl);
81 if (r < 0) {
82 CLS_ERR("failed to set omap key: %s", key.c_str());
83 return r;
84 }
85 return 0;
86}
87
88int remove_key(cls_method_context_t hctx, const string &key) {
89 int r = cls_cxx_map_remove_key(hctx, key);
90 if (r < 0 && r != -ENOENT) {
91 CLS_ERR("failed to remove key: %s", key.c_str());
92 return r;
93 }
94 return 0;
95}
96
97int expire_tags(cls_method_context_t hctx, const std::string *skip_client_id) {
98
99 std::string skip_client_key;
100 if (skip_client_id != nullptr) {
101 skip_client_key = key_from_client_id(*skip_client_id);
102 }
103
104 int r;
105 uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
106 std::string last_read = HEADER_KEY_CLIENT_PREFIX;
107 do {
108 std::map<std::string, bufferlist> vals;
109 r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
110 MAX_KEYS_READ, &vals);
111 if (r < 0 && r != -ENOENT) {
112 CLS_ERR("failed to retrieve registered clients: %s",
113 cpp_strerror(r).c_str());
114 return r;
115 }
116
117 for (auto &val : vals) {
118 // if we are removing a client, skip its commit positions
119 if (val.first == skip_client_key) {
120 continue;
121 }
122
123 cls::journal::Client client;
124 bufferlist::iterator iter = val.second.begin();
125 try {
126 ::decode(client, iter);
127 } catch (const buffer::error &err) {
128 CLS_ERR("error decoding registered client: %s",
129 val.first.c_str());
130 return -EIO;
131 }
132
133 for (auto object_position : client.commit_position.object_positions) {
134 minimum_tag_tid = MIN(minimum_tag_tid, object_position.tag_tid);
135 }
136 }
137 if (!vals.empty()) {
138 last_read = vals.rbegin()->first;
139 }
140 } while (r == MAX_KEYS_READ);
141
142 // cannot expire tags if a client hasn't committed yet
143 if (minimum_tag_tid == std::numeric_limits<uint64_t>::max()) {
144 return 0;
145 }
146
147 // compute the minimum in-use tag for each class
148 std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
149 typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
150 TAG_PASS_SCRUB,
151 TAG_PASS_DONE } TagPass;
152 int tag_pass = TAG_PASS_CALCULATE_MINIMUMS;
153 last_read = HEADER_KEY_TAG_PREFIX;
154 do {
155 std::map<std::string, bufferlist> vals;
156 r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
157 MAX_KEYS_READ, &vals);
158 if (r < 0 && r != -ENOENT) {
159 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
160 return r;
161 }
162
163 for (auto &val : vals) {
164 cls::journal::Tag tag;
165 bufferlist::iterator iter = val.second.begin();
166 try {
167 ::decode(tag, iter);
168 } catch (const buffer::error &err) {
169 CLS_ERR("error decoding tag: %s", val.first.c_str());
170 return -EIO;
171 }
172
173 if (tag.tid != tag_tid_from_key(val.first)) {
174 CLS_ERR("tag tid mismatched: %s", val.first.c_str());
175 return -EINVAL;
176 }
177
178 if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
179 minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
180 } else if (tag_pass == TAG_PASS_SCRUB &&
181 tag.tid < minimum_tag_class_to_tids[tag.tag_class]) {
182 r = remove_key(hctx, val.first);
183 if (r < 0) {
184 return r;
185 }
186 }
187
188 if (tag.tid >= minimum_tag_tid) {
189 // no need to check for tag classes beyond this point
190 vals.clear();
191 break;
192 }
193 }
194
195 if (tag_pass != TAG_PASS_DONE && vals.size() < MAX_KEYS_READ) {
196 last_read = HEADER_KEY_TAG_PREFIX;
197 ++tag_pass;
198 } else if (!vals.empty()) {
199 last_read = vals.rbegin()->first;
200 }
201 } while (tag_pass != TAG_PASS_DONE);
202 return 0;
203}
204
205int get_client_list_range(cls_method_context_t hctx,
206 std::set<cls::journal::Client> *clients,
207 std::string start_after, uint64_t max_return) {
208 std::string last_read;
209 if (!start_after.empty()) {
210 last_read = key_from_client_id(start_after);
211 }
212
213 std::map<std::string, bufferlist> vals;
214 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
215 max_return, &vals);
216 if (r < 0) {
217 CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r).c_str());
218 return r;
219 }
220
221 for (std::map<std::string, bufferlist>::iterator it = vals.begin();
222 it != vals.end(); ++it) {
223 try {
224 bufferlist::iterator iter = it->second.begin();
225
226 cls::journal::Client client;
227 ::decode(client, iter);
228 clients->insert(client);
229 } catch (const buffer::error &err) {
230 CLS_ERR("could not decode client '%s': %s", it->first.c_str(),
231 err.what());
232 return -EIO;
233 }
234 }
235
236 return 0;
237}
238
239int find_min_commit_position(cls_method_context_t hctx,
240 cls::journal::ObjectSetPosition *minset) {
241 int r;
242 bool valid = false;
243 std::string start_after = "";
244 uint64_t tag_tid = 0, entry_tid = 0;
245
246 while (true) {
247 std::set<cls::journal::Client> batch;
248
249 r = get_client_list_range(hctx, &batch, start_after, cls::journal::JOURNAL_MAX_RETURN);
250 if ((r < 0) || batch.empty()) {
251 break;
252 }
253
254 start_after = batch.rbegin()->id;
255
256 // update the (minimum) commit position from this batch of clients
257 for(std::set<cls::journal::Client>::iterator it = batch.begin();
258 it != batch.end(); ++it) {
259 cls::journal::ObjectSetPosition object_set_position = (*it).commit_position;
260 if (object_set_position.object_positions.empty()) {
261 *minset = cls::journal::ObjectSetPosition();
262 break;
263 }
264 cls::journal::ObjectPosition first = object_set_position.object_positions.front();
265
266 // least tag_tid (or least entry_tid for matching tag_tid)
267 if (!valid || (tag_tid > first.tag_tid) || ((tag_tid == first.tag_tid) && (entry_tid > first.entry_tid))) {
268 tag_tid = first.tag_tid;
269 entry_tid = first.entry_tid;
270 *minset = cls::journal::ObjectSetPosition(object_set_position);
271 valid = true;
272 }
273 }
274
275 // got the last batch, we're done
276 if (batch.size() < cls::journal::JOURNAL_MAX_RETURN) {
277 break;
278 }
279 }
280
281 return r;
282}
283
284} // anonymous namespace
285
286/**
287 * Input:
288 * @param order (uint8_t) - bits to shift to compute the object max size
289 * @param splay width (uint8_t) - number of active journal objects
290 *
291 * Output:
292 * @returns 0 on success, negative error code on failure
293 */
294int journal_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
295 uint8_t order;
296 uint8_t splay_width;
297 int64_t pool_id;
298 try {
299 bufferlist::iterator iter = in->begin();
300 ::decode(order, iter);
301 ::decode(splay_width, iter);
302 ::decode(pool_id, iter);
303 } catch (const buffer::error &err) {
304 CLS_ERR("failed to decode input parameters: %s", err.what());
305 return -EINVAL;
306 }
307
308 bufferlist stored_orderbl;
309 int r = cls_cxx_map_get_val(hctx, HEADER_KEY_ORDER, &stored_orderbl);
310 if (r >= 0) {
311 CLS_ERR("journal already exists");
312 return -EEXIST;
313 } else if (r != -ENOENT) {
314 return r;
315 }
316
317 r = write_key(hctx, HEADER_KEY_ORDER, order);
318 if (r < 0) {
319 return r;
320 }
321
322 r = write_key(hctx, HEADER_KEY_SPLAY_WIDTH, splay_width);
323 if (r < 0) {
324 return r;
325 }
326
327 r = write_key(hctx, HEADER_KEY_POOL_ID, pool_id);
328 if (r < 0) {
329 return r;
330 }
331
332 uint64_t object_set = 0;
333 r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
334 if (r < 0) {
335 return r;
336 }
337
338 r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
339 if (r < 0) {
340 return r;
341 }
342
343 uint64_t tag_id = 0;
344 r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_id);
345 if (r < 0) {
346 return r;
347 }
348
349 r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_id);
350 if (r < 0) {
351 return r;
352 }
353 return 0;
354}
355
356/**
357 * Input:
358 * none
359 *
360 * Output:
361 * order (uint8_t)
362 * @returns 0 on success, negative error code on failure
363 */
364int journal_get_order(cls_method_context_t hctx, bufferlist *in,
365 bufferlist *out) {
366 uint8_t order;
367 int r = read_key(hctx, HEADER_KEY_ORDER, &order);
368 if (r < 0) {
369 return r;
370 }
371
372 ::encode(order, *out);
373 return 0;
374}
375
376/**
377 * Input:
378 * none
379 *
380 * Output:
381 * order (uint8_t)
382 * @returns 0 on success, negative error code on failure
383 */
384int journal_get_splay_width(cls_method_context_t hctx, bufferlist *in,
385 bufferlist *out) {
386 uint8_t splay_width;
387 int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
388 if (r < 0) {
389 return r;
390 }
391
392 ::encode(splay_width, *out);
393 return 0;
394}
395
396/**
397 * Input:
398 * none
399 *
400 * Output:
401 * pool_id (int64_t)
402 * @returns 0 on success, negative error code on failure
403 */
404int journal_get_pool_id(cls_method_context_t hctx, bufferlist *in,
405 bufferlist *out) {
406 int64_t pool_id;
407 int r = read_key(hctx, HEADER_KEY_POOL_ID, &pool_id);
408 if (r < 0) {
409 return r;
410 }
411
412 ::encode(pool_id, *out);
413 return 0;
414}
415
416/**
417 * Input:
418 * none
419 *
420 * Output:
421 * object set (uint64_t)
422 * @returns 0 on success, negative error code on failure
423 */
424int journal_get_minimum_set(cls_method_context_t hctx, bufferlist *in,
425 bufferlist *out) {
426 uint64_t minimum_set;
427 int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &minimum_set);
428 if (r < 0) {
429 return r;
430 }
431
432 ::encode(minimum_set, *out);
433 return 0;
434}
435
436/**
437 * Input:
438 * @param object set (uint64_t)
439 *
440 * Output:
441 * @returns 0 on success, negative error code on failure
442 */
443int journal_set_minimum_set(cls_method_context_t hctx, bufferlist *in,
444 bufferlist *out) {
445 uint64_t object_set;
446 try {
447 bufferlist::iterator iter = in->begin();
448 ::decode(object_set, iter);
449 } catch (const buffer::error &err) {
450 CLS_ERR("failed to decode input parameters: %s", err.what());
451 return -EINVAL;
452 }
453
454 uint64_t current_active_set;
455 int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
456 if (r < 0) {
457 return r;
458 }
459
460 if (current_active_set < object_set) {
461 CLS_ERR("active object set earlier than minimum: %" PRIu64
462 " < %" PRIu64, current_active_set, object_set);
463 return -EINVAL;
464 }
465
466 uint64_t current_minimum_set;
467 r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
468 if (r < 0) {
469 return r;
470 }
471
472 if (object_set == current_minimum_set) {
473 return 0;
474 } else if (object_set < current_minimum_set) {
475 CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
476 object_set, current_minimum_set);
477 return -ESTALE;
478 }
479
480 r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
481 if (r < 0) {
482 return r;
483 }
484 return 0;
485}
486
487/**
488 * Input:
489 * none
490 *
491 * Output:
492 * object set (uint64_t)
493 * @returns 0 on success, negative error code on failure
494 */
495int journal_get_active_set(cls_method_context_t hctx, bufferlist *in,
496 bufferlist *out) {
497 uint64_t active_set;
498 int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &active_set);
499 if (r < 0) {
500 return r;
501 }
502
503 ::encode(active_set, *out);
504 return 0;
505}
506
507/**
508 * Input:
509 * @param object set (uint64_t)
510 *
511 * Output:
512 * @returns 0 on success, negative error code on failure
513 */
514int journal_set_active_set(cls_method_context_t hctx, bufferlist *in,
515 bufferlist *out) {
516 uint64_t object_set;
517 try {
518 bufferlist::iterator iter = in->begin();
519 ::decode(object_set, iter);
520 } catch (const buffer::error &err) {
521 CLS_ERR("failed to decode input parameters: %s", err.what());
522 return -EINVAL;
523 }
524
525 uint64_t current_minimum_set;
526 int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
527 if (r < 0) {
528 return r;
529 }
530
531 if (current_minimum_set > object_set) {
532 CLS_ERR("minimum object set later than active: %" PRIu64
533 " > %" PRIu64, current_minimum_set, object_set);
534 return -EINVAL;
535 }
536
537 uint64_t current_active_set;
538 r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
539 if (r < 0) {
540 return r;
541 }
542
543 if (object_set == current_active_set) {
544 return 0;
545 } else if (object_set < current_active_set) {
546 CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
547 object_set, current_active_set);
548 return -ESTALE;
549 }
550
551 r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
552 if (r < 0) {
553 return r;
554 }
555 return 0;
556}
557
558/**
559 * Input:
560 * @param id (string) - unique client id
561 *
562 * Output:
563 * cls::journal::Client
564 * @returns 0 on success, negative error code on failure
565 */
566int journal_get_client(cls_method_context_t hctx, bufferlist *in,
567 bufferlist *out) {
568 std::string id;
569 try {
570 bufferlist::iterator iter = in->begin();
571 ::decode(id, iter);
572 } catch (const buffer::error &err) {
573 CLS_ERR("failed to decode input parameters: %s", err.what());
574 return -EINVAL;
575 }
576
577 std::string key(key_from_client_id(id));
578 cls::journal::Client client;
579 int r = read_key(hctx, key, &client);
580 if (r < 0) {
581 return r;
582 }
583
584 ::encode(client, *out);
585 return 0;
586}
587
588/**
589 * Input:
590 * @param id (string) - unique client id
591 * @param data (bufferlist) - opaque data associated to client
592 *
593 * Output:
594 * @returns 0 on success, negative error code on failure
595 */
596int journal_client_register(cls_method_context_t hctx, bufferlist *in,
597 bufferlist *out) {
598 std::string id;
599 bufferlist data;
600 try {
601 bufferlist::iterator iter = in->begin();
602 ::decode(id, iter);
603 ::decode(data, iter);
604 } catch (const buffer::error &err) {
605 CLS_ERR("failed to decode input parameters: %s", err.what());
606 return -EINVAL;
607 }
608
609 uint8_t order;
610 int r = read_key(hctx, HEADER_KEY_ORDER, &order);
611 if (r < 0) {
612 return r;
613 }
614
615 std::string key(key_from_client_id(id));
616 bufferlist stored_clientbl;
617 r = cls_cxx_map_get_val(hctx, key, &stored_clientbl);
618 if (r >= 0) {
619 CLS_ERR("duplicate client id: %s", id.c_str());
620 return -EEXIST;
621 } else if (r != -ENOENT) {
622 return r;
623 }
624
625 cls::journal::ObjectSetPosition minset;
626 r = find_min_commit_position(hctx, &minset);
627 if (r < 0)
628 return r;
629
630 cls::journal::Client client(id, data, minset);
631 r = write_key(hctx, key, client);
632 if (r < 0) {
633 return r;
634 }
635 return 0;
636}
637
638/**
639 * Input:
640 * @param id (string) - unique client id
641 * @param data (bufferlist) - opaque data associated to client
642 *
643 * Output:
644 * @returns 0 on success, negative error code on failure
645 */
646int journal_client_update_data(cls_method_context_t hctx, bufferlist *in,
647 bufferlist *out) {
648 std::string id;
649 bufferlist data;
650 try {
651 bufferlist::iterator iter = in->begin();
652 ::decode(id, iter);
653 ::decode(data, iter);
654 } catch (const buffer::error &err) {
655 CLS_ERR("failed to decode input parameters: %s", err.what());
656 return -EINVAL;
657 }
658
659 std::string key(key_from_client_id(id));
660 cls::journal::Client client;
661 int r = read_key(hctx, key, &client);
662 if (r < 0) {
663 return r;
664 }
665
666 client.data = data;
667 r = write_key(hctx, key, client);
668 if (r < 0) {
669 return r;
670 }
671 return 0;
672}
673
674/**
675 * Input:
676 * @param id (string) - unique client id
677 * @param state (uint8_t) - client state
678 *
679 * Output:
680 * @returns 0 on success, negative error code on failure
681 */
682int journal_client_update_state(cls_method_context_t hctx, bufferlist *in,
683 bufferlist *out) {
684 std::string id;
685 cls::journal::ClientState state;
686 bufferlist data;
687 try {
688 bufferlist::iterator iter = in->begin();
689 ::decode(id, iter);
690 uint8_t state_raw;
691 ::decode(state_raw, iter);
692 state = static_cast<cls::journal::ClientState>(state_raw);
693 } catch (const buffer::error &err) {
694 CLS_ERR("failed to decode input parameters: %s", err.what());
695 return -EINVAL;
696 }
697
698 std::string key(key_from_client_id(id));
699 cls::journal::Client client;
700 int r = read_key(hctx, key, &client);
701 if (r < 0) {
702 return r;
703 }
704
705 client.state = state;
706 r = write_key(hctx, key, client);
707 if (r < 0) {
708 return r;
709 }
710 return 0;
711}
712
713/**
714 * Input:
715 * @param id (string) - unique client id
716 *
717 * Output:
718 * @returns 0 on success, negative error code on failure
719 */
720int journal_client_unregister(cls_method_context_t hctx, bufferlist *in,
721 bufferlist *out) {
722 std::string id;
723 try {
724 bufferlist::iterator iter = in->begin();
725 ::decode(id, iter);
726 } catch (const buffer::error &err) {
727 CLS_ERR("failed to decode input parameters: %s", err.what());
728 return -EINVAL;
729 }
730
731 std::string key(key_from_client_id(id));
732 bufferlist bl;
733 int r = cls_cxx_map_get_val(hctx, key, &bl);
734 if (r < 0) {
735 CLS_ERR("client is not registered: %s", id.c_str());
736 return r;
737 }
738
739 r = cls_cxx_map_remove_key(hctx, key);
740 if (r < 0) {
741 CLS_ERR("failed to remove omap key: %s", key.c_str());
742 return r;
743 }
744
745 // prune expired tags
746 r = expire_tags(hctx, &id);
747 if (r < 0) {
748 return r;
749 }
750 return 0;
751}
752
753/**
754 * Input:
755 * @param client_id (uint64_t) - unique client id
756 * @param commit_position (ObjectSetPosition)
757 *
758 * Output:
759 * @returns 0 on success, negative error code on failure
760 */
761int journal_client_commit(cls_method_context_t hctx, bufferlist *in,
762 bufferlist *out) {
763 std::string id;
764 cls::journal::ObjectSetPosition commit_position;
765 try {
766 bufferlist::iterator iter = in->begin();
767 ::decode(id, iter);
768 ::decode(commit_position, iter);
769 } catch (const buffer::error &err) {
770 CLS_ERR("failed to decode input parameters: %s", err.what());
771 return -EINVAL;
772 }
773
774 uint8_t splay_width;
775 int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
776 if (r < 0) {
777 return r;
778 }
779 if (commit_position.object_positions.size() > splay_width) {
780 CLS_ERR("too many object positions");
781 return -EINVAL;
782 }
783
784 std::string key(key_from_client_id(id));
785 cls::journal::Client client;
786 r = read_key(hctx, key, &client);
787 if (r < 0) {
788 return r;
789 }
790
791 if (client.commit_position == commit_position) {
792 return 0;
793 }
794
795 client.commit_position = commit_position;
796 r = write_key(hctx, key, client);
797 if (r < 0) {
798 return r;
799 }
800 return 0;
801}
802
803/**
804 * Input:
805 * @param start_after (string)
806 * @param max_return (uint64_t)
807 *
808 * Output:
809 * clients (set<cls::journal::Client>) - collection of registered clients
810 * @returns 0 on success, negative error code on failure
811 */
812int journal_client_list(cls_method_context_t hctx, bufferlist *in,
813 bufferlist *out) {
814 std::string start_after;
815 uint64_t max_return;
816 try {
817 bufferlist::iterator iter = in->begin();
818 ::decode(start_after, iter);
819 ::decode(max_return, iter);
820 } catch (const buffer::error &err) {
821 CLS_ERR("failed to decode input parameters: %s", err.what());
822 return -EINVAL;
823 }
824
825 std::set<cls::journal::Client> clients;
826 int r = get_client_list_range(hctx, &clients, start_after, max_return);
827 if (r < 0)
828 return r;
829
830 ::encode(clients, *out);
831 return 0;
832}
833
834/**
835 * Input:
836 * none
837 *
838 * Output:
839 * @returns 0 on success, negative error code on failure
840 */
841int journal_get_next_tag_tid(cls_method_context_t hctx, bufferlist *in,
842 bufferlist *out) {
843 uint64_t tag_tid;
844 int r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &tag_tid);
845 if (r < 0) {
846 return r;
847 }
848
849 ::encode(tag_tid, *out);
850 return 0;
851}
852
853/**
854 * Input:
855 * @param tag_tid (uint64_t)
856 *
857 * Output:
858 * cls::journal::Tag
859 * @returns 0 on success, negative error code on failure
860 */
861int journal_get_tag(cls_method_context_t hctx, bufferlist *in,
862 bufferlist *out) {
863 uint64_t tag_tid;
864 try {
865 bufferlist::iterator iter = in->begin();
866 ::decode(tag_tid, iter);
867 } catch (const buffer::error &err) {
868 CLS_ERR("failed to decode input parameters: %s", err.what());
869 return -EINVAL;
870 }
871
872 std::string key(key_from_tag_tid(tag_tid));
873 cls::journal::Tag tag;
874 int r = read_key(hctx, key, &tag);
875 if (r < 0) {
876 return r;
877 }
878
879 ::encode(tag, *out);
880 return 0;
881}
882
883/**
884 * Input:
885 * @param tag_tid (uint64_t)
886 * @param tag_class (uint64_t)
887 * @param data (bufferlist)
888 *
889 * Output:
890 * @returns 0 on success, negative error code on failure
891 */
892int journal_tag_create(cls_method_context_t hctx, bufferlist *in,
893 bufferlist *out) {
894 uint64_t tag_tid;
895 uint64_t tag_class;
896 bufferlist data;
897 try {
898 bufferlist::iterator iter = in->begin();
899 ::decode(tag_tid, iter);
900 ::decode(tag_class, iter);
901 ::decode(data, iter);
902 } catch (const buffer::error &err) {
903 CLS_ERR("failed to decode input parameters: %s", err.what());
904 return -EINVAL;
905 }
906
907 std::string key(key_from_tag_tid(tag_tid));
908 bufferlist stored_tag_bl;
909 int r = cls_cxx_map_get_val(hctx, key, &stored_tag_bl);
910 if (r >= 0) {
911 CLS_ERR("duplicate tag id: %" PRIu64, tag_tid);
912 return -EEXIST;
913 } else if (r != -ENOENT) {
914 return r;
915 }
916
917 // verify tag tid ordering
918 uint64_t next_tag_tid;
919 r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &next_tag_tid);
920 if (r < 0) {
921 return r;
922 }
923 if (tag_tid != next_tag_tid) {
924 CLS_LOG(5, "out-of-order tag sequence: %" PRIu64, tag_tid);
925 return -ESTALE;
926 }
927
928 uint64_t next_tag_class;
929 r = read_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, &next_tag_class);
930 if (r < 0) {
931 return r;
932 }
933
934 if (tag_class == cls::journal::Tag::TAG_CLASS_NEW) {
935 // allocate a new tag class
936 tag_class = next_tag_class;
937 r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_class + 1);
938 if (r < 0) {
939 return r;
940 }
941 } else {
942 // verify tag class range
943 if (tag_class >= next_tag_class) {
944 CLS_ERR("out-of-sequence tag class: %" PRIu64, tag_class);
945 return -EINVAL;
946 }
947 }
948
949 // prune expired tags
950 r = expire_tags(hctx, nullptr);
951 if (r < 0) {
952 return r;
953 }
954
955 // update tag tid sequence
956 r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_tid + 1);
957 if (r < 0) {
958 return r;
959 }
960
961 // write tag structure
962 cls::journal::Tag tag(tag_tid, tag_class, data);
963 key = key_from_tag_tid(tag_tid);
964 r = write_key(hctx, key, tag);
965 if (r < 0) {
966 return r;
967 }
968 return 0;
969}
970
971/**
972 * Input:
973 * @param start_after_tag_tid (uint64_t) - first tag tid
974 * @param max_return (uint64_t) - max tags to return
975 * @param client_id (std::string) - client id filter
976 * @param tag_class (boost::optional<uint64_t> - optional tag class filter
977 *
978 * Output:
979 * std::set<cls::journal::Tag> - collection of tags
980 * @returns 0 on success, negative error code on failure
981 */
982int journal_tag_list(cls_method_context_t hctx, bufferlist *in,
983 bufferlist *out) {
984 uint64_t start_after_tag_tid;
985 uint64_t max_return;
986 std::string client_id;
987 boost::optional<uint64_t> tag_class(0);
988
989 // handle compiler false positive about use-before-init
990 tag_class = boost::none;
991 try {
992 bufferlist::iterator iter = in->begin();
993 ::decode(start_after_tag_tid, iter);
994 ::decode(max_return, iter);
995 ::decode(client_id, iter);
996 ::decode(tag_class, iter);
997 } catch (const buffer::error &err) {
998 CLS_ERR("failed to decode input parameters: %s", err.what());
999 return -EINVAL;
1000 }
1001
1002 // calculate the minimum tag within client's commit position
1003 uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
1004 cls::journal::Client client;
1005 int r = read_key(hctx, key_from_client_id(client_id), &client);
1006 if (r < 0) {
1007 return r;
1008 }
1009
1010 for (auto object_position : client.commit_position.object_positions) {
1011 minimum_tag_tid = MIN(minimum_tag_tid, object_position.tag_tid);
1012 }
1013
1014 // compute minimum tags in use per-class
1015 std::set<cls::journal::Tag> tags;
1016 std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
1017 typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
1018 TAG_PASS_LIST,
1019 TAG_PASS_DONE } TagPass;
1020 int tag_pass = (minimum_tag_tid == std::numeric_limits<uint64_t>::max() ?
1021 TAG_PASS_LIST : TAG_PASS_CALCULATE_MINIMUMS);
1022 std::string last_read = HEADER_KEY_TAG_PREFIX;
1023 do {
1024 std::map<std::string, bufferlist> vals;
1025 r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
1026 MAX_KEYS_READ, &vals);
1027 if (r < 0 && r != -ENOENT) {
1028 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
1029 return r;
1030 }
1031
1032 for (auto &val : vals) {
1033 cls::journal::Tag tag;
1034 bufferlist::iterator iter = val.second.begin();
1035 try {
1036 ::decode(tag, iter);
1037 } catch (const buffer::error &err) {
1038 CLS_ERR("error decoding tag: %s", val.first.c_str());
1039 return -EIO;
1040 }
1041
1042 if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
1043 minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
1044
1045 // completed calculation of tag class minimums
1046 if (tag.tid >= minimum_tag_tid) {
1047 vals.clear();
1048 break;
1049 }
1050 } else if (tag_pass == TAG_PASS_LIST) {
1051 if (start_after_tag_tid != 0 && tag.tid <= start_after_tag_tid) {
1052 continue;
1053 }
1054
1055 if (tag.tid >= minimum_tag_class_to_tids[tag.tag_class] &&
1056 (!tag_class || *tag_class == tag.tag_class)) {
1057 tags.insert(tag);
1058 }
1059 if (tags.size() >= max_return) {
1060 tag_pass = TAG_PASS_DONE;
1061 }
1062 }
1063 }
1064
1065 if (tag_pass != TAG_PASS_DONE && vals.size() < MAX_KEYS_READ) {
1066 last_read = HEADER_KEY_TAG_PREFIX;
1067 ++tag_pass;
1068 } else if (!vals.empty()) {
1069 last_read = vals.rbegin()->first;
1070 }
1071 } while (tag_pass != TAG_PASS_DONE);
1072
1073 ::encode(tags, *out);
1074 return 0;
1075}
1076
1077/**
1078 * Input:
1079 * @param soft_max_size (uint64_t)
1080 *
1081 * Output:
1082 * @returns 0 if object size less than max, negative error code otherwise
1083 */
1084int journal_object_guard_append(cls_method_context_t hctx, bufferlist *in,
1085 bufferlist *out) {
1086 uint64_t soft_max_size;
1087 try {
1088 bufferlist::iterator iter = in->begin();
1089 ::decode(soft_max_size, iter);
1090 } catch (const buffer::error &err) {
1091 CLS_ERR("failed to decode input parameters: %s", err.what());
1092 return -EINVAL;
1093 }
1094
1095 uint64_t size;
1096 time_t mtime;
1097 int r = cls_cxx_stat(hctx, &size, &mtime);
1098 if (r == -ENOENT) {
1099 return 0;
1100 } else if (r < 0) {
1101 CLS_ERR("failed to stat object: %s", cpp_strerror(r).c_str());
1102 return r;
1103 }
1104
1105 if (size >= soft_max_size) {
1106 CLS_LOG(5, "journal object full: %" PRIu64 " >= %" PRIu64,
1107 size, soft_max_size);
1108 return -EOVERFLOW;
1109 }
1110 return 0;
1111}
1112
1113CLS_INIT(journal)
1114{
1115 CLS_LOG(20, "Loaded journal class!");
1116
1117 cls_handle_t h_class;
1118 cls_method_handle_t h_journal_create;
1119 cls_method_handle_t h_journal_get_order;
1120 cls_method_handle_t h_journal_get_splay_width;
1121 cls_method_handle_t h_journal_get_pool_id;
1122 cls_method_handle_t h_journal_get_minimum_set;
1123 cls_method_handle_t h_journal_set_minimum_set;
1124 cls_method_handle_t h_journal_get_active_set;
1125 cls_method_handle_t h_journal_set_active_set;
1126 cls_method_handle_t h_journal_get_client;
1127 cls_method_handle_t h_journal_client_register;
1128 cls_method_handle_t h_journal_client_update_data;
1129 cls_method_handle_t h_journal_client_update_state;
1130 cls_method_handle_t h_journal_client_unregister;
1131 cls_method_handle_t h_journal_client_commit;
1132 cls_method_handle_t h_journal_client_list;
1133 cls_method_handle_t h_journal_get_next_tag_tid;
1134 cls_method_handle_t h_journal_get_tag;
1135 cls_method_handle_t h_journal_tag_create;
1136 cls_method_handle_t h_journal_tag_list;
1137 cls_method_handle_t h_journal_object_guard_append;
1138
1139 cls_register("journal", &h_class);
1140
1141 /// methods for journal.$journal_id objects
1142 cls_register_cxx_method(h_class, "create",
1143 CLS_METHOD_RD | CLS_METHOD_WR,
1144 journal_create, &h_journal_create);
1145 cls_register_cxx_method(h_class, "get_order",
1146 CLS_METHOD_RD,
1147 journal_get_order, &h_journal_get_order);
1148 cls_register_cxx_method(h_class, "get_splay_width",
1149 CLS_METHOD_RD,
1150 journal_get_splay_width, &h_journal_get_splay_width);
1151 cls_register_cxx_method(h_class, "get_pool_id",
1152 CLS_METHOD_RD,
1153 journal_get_pool_id, &h_journal_get_pool_id);
1154 cls_register_cxx_method(h_class, "get_minimum_set",
1155 CLS_METHOD_RD,
1156 journal_get_minimum_set,
1157 &h_journal_get_minimum_set);
1158 cls_register_cxx_method(h_class, "set_minimum_set",
1159 CLS_METHOD_RD | CLS_METHOD_WR,
1160 journal_set_minimum_set,
1161 &h_journal_set_minimum_set);
1162 cls_register_cxx_method(h_class, "get_active_set",
1163 CLS_METHOD_RD,
1164 journal_get_active_set,
1165 &h_journal_get_active_set);
1166 cls_register_cxx_method(h_class, "set_active_set",
1167 CLS_METHOD_RD | CLS_METHOD_WR,
1168 journal_set_active_set,
1169 &h_journal_set_active_set);
1170
1171 cls_register_cxx_method(h_class, "get_client",
1172 CLS_METHOD_RD,
1173 journal_get_client, &h_journal_get_client);
1174 cls_register_cxx_method(h_class, "client_register",
1175 CLS_METHOD_RD | CLS_METHOD_WR,
1176 journal_client_register, &h_journal_client_register);
1177 cls_register_cxx_method(h_class, "client_update_data",
1178 CLS_METHOD_RD | CLS_METHOD_WR,
1179 journal_client_update_data,
1180 &h_journal_client_update_data);
1181 cls_register_cxx_method(h_class, "client_update_state",
1182 CLS_METHOD_RD | CLS_METHOD_WR,
1183 journal_client_update_state,
1184 &h_journal_client_update_state);
1185 cls_register_cxx_method(h_class, "client_unregister",
1186 CLS_METHOD_RD | CLS_METHOD_WR,
1187 journal_client_unregister,
1188 &h_journal_client_unregister);
1189 cls_register_cxx_method(h_class, "client_commit",
1190 CLS_METHOD_RD | CLS_METHOD_WR,
1191 journal_client_commit, &h_journal_client_commit);
1192 cls_register_cxx_method(h_class, "client_list",
1193 CLS_METHOD_RD,
1194 journal_client_list, &h_journal_client_list);
1195
1196 cls_register_cxx_method(h_class, "get_next_tag_tid",
1197 CLS_METHOD_RD,
1198 journal_get_next_tag_tid,
1199 &h_journal_get_next_tag_tid);
1200 cls_register_cxx_method(h_class, "get_tag",
1201 CLS_METHOD_RD,
1202 journal_get_tag, &h_journal_get_tag);
1203 cls_register_cxx_method(h_class, "tag_create",
1204 CLS_METHOD_RD | CLS_METHOD_WR,
1205 journal_tag_create, &h_journal_tag_create);
1206 cls_register_cxx_method(h_class, "tag_list",
1207 CLS_METHOD_RD,
1208 journal_tag_list, &h_journal_tag_list);
1209
1210 /// methods for journal_data.$journal_id.$object_id objects
1211 cls_register_cxx_method(h_class, "guard_append",
1212 CLS_METHOD_RD | CLS_METHOD_WR,
1213 journal_object_guard_append,
1214 &h_journal_object_guard_append);
1215}