1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/int_types.h"
5 #include "include/buffer.h"
6 #include "include/encoding.h"
7 #include "common/errno.h"
8 #include "objclass/objclass.h"
9 #include "cls/journal/cls_journal_types.h"
20 static const uint64_t MAX_KEYS_READ
= 64;
22 static const std::string HEADER_KEY_ORDER
= "order";
23 static const std::string HEADER_KEY_SPLAY_WIDTH
= "splay_width";
24 static const std::string HEADER_KEY_POOL_ID
= "pool_id";
25 static const std::string HEADER_KEY_MINIMUM_SET
= "minimum_set";
26 static const std::string HEADER_KEY_ACTIVE_SET
= "active_set";
27 static const std::string HEADER_KEY_NEXT_TAG_TID
= "next_tag_tid";
28 static const std::string HEADER_KEY_NEXT_TAG_CLASS
= "next_tag_class";
29 static const std::string HEADER_KEY_CLIENT_PREFIX
= "client_";
30 static const std::string HEADER_KEY_TAG_PREFIX
= "tag_";
32 std::string
to_hex(uint64_t value
) {
33 std::ostringstream oss
;
34 oss
<< std::setw(16) << std::setfill('0') << std::hex
<< value
;
38 std::string
key_from_client_id(const std::string
&client_id
) {
39 return HEADER_KEY_CLIENT_PREFIX
+ client_id
;
42 std::string
key_from_tag_tid(uint64_t tag_tid
) {
43 return HEADER_KEY_TAG_PREFIX
+ to_hex(tag_tid
);
46 uint64_t tag_tid_from_key(const std::string
&key
) {
47 std::istringstream
iss(key
);
49 iss
.ignore(HEADER_KEY_TAG_PREFIX
.size()) >> std::hex
>> id
;
54 int read_key(cls_method_context_t hctx
, const string
&key
, T
*t
,
55 bool ignore_enoent
= false) {
57 int r
= cls_cxx_map_get_val(hctx
, key
, &bl
);
58 if (r
== -ENOENT
&& ignore_enoent
) {
61 CLS_ERR("failed to get omap key: %s", key
.c_str());
66 bufferlist::iterator iter
= bl
.begin();
68 } catch (const buffer::error
&err
) {
69 CLS_ERR("failed to decode input parameters: %s", err
.what());
76 int write_key(cls_method_context_t hctx
, const string
&key
, const T
&t
) {
80 int r
= cls_cxx_map_set_val(hctx
, key
, &bl
);
82 CLS_ERR("failed to set omap key: %s", key
.c_str());
88 int remove_key(cls_method_context_t hctx
, const string
&key
) {
89 int r
= cls_cxx_map_remove_key(hctx
, key
);
90 if (r
< 0 && r
!= -ENOENT
) {
91 CLS_ERR("failed to remove key: %s", key
.c_str());
97 int expire_tags(cls_method_context_t hctx
, const std::string
*skip_client_id
) {
99 std::string skip_client_key
;
100 if (skip_client_id
!= nullptr) {
101 skip_client_key
= key_from_client_id(*skip_client_id
);
104 uint64_t minimum_tag_tid
= std::numeric_limits
<uint64_t>::max();
105 std::string last_read
= HEADER_KEY_CLIENT_PREFIX
;
108 std::map
<std::string
, bufferlist
> vals
;
109 int r
= cls_cxx_map_get_vals(hctx
, last_read
, HEADER_KEY_CLIENT_PREFIX
,
110 MAX_KEYS_READ
, &vals
, &more
);
111 if (r
< 0 && r
!= -ENOENT
) {
112 CLS_ERR("failed to retrieve registered clients: %s",
113 cpp_strerror(r
).c_str());
117 for (auto &val
: vals
) {
118 // if we are removing a client, skip its commit positions
119 if (val
.first
== skip_client_key
) {
123 cls::journal::Client client
;
124 bufferlist::iterator iter
= val
.second
.begin();
126 ::decode(client
, iter
);
127 } catch (const buffer::error
&err
) {
128 CLS_ERR("error decoding registered client: %s",
133 for (auto object_position
: client
.commit_position
.object_positions
) {
134 minimum_tag_tid
= MIN(minimum_tag_tid
, object_position
.tag_tid
);
138 last_read
= vals
.rbegin()->first
;
142 // cannot expire tags if a client hasn't committed yet
143 if (minimum_tag_tid
== std::numeric_limits
<uint64_t>::max()) {
147 // compute the minimum in-use tag for each class
148 std::map
<uint64_t, uint64_t> minimum_tag_class_to_tids
;
149 typedef enum { TAG_PASS_CALCULATE_MINIMUMS
,
151 TAG_PASS_DONE
} TagPass
;
152 int tag_pass
= TAG_PASS_CALCULATE_MINIMUMS
;
153 last_read
= HEADER_KEY_TAG_PREFIX
;
155 std::map
<std::string
, bufferlist
> vals
;
156 int r
= cls_cxx_map_get_vals(hctx
, last_read
, HEADER_KEY_TAG_PREFIX
,
157 MAX_KEYS_READ
, &vals
, &more
);
158 if (r
< 0 && r
!= -ENOENT
) {
159 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r
).c_str());
163 for (auto &val
: vals
) {
164 cls::journal::Tag tag
;
165 bufferlist::iterator iter
= val
.second
.begin();
168 } catch (const buffer::error
&err
) {
169 CLS_ERR("error decoding tag: %s", val
.first
.c_str());
173 if (tag
.tid
!= tag_tid_from_key(val
.first
)) {
174 CLS_ERR("tag tid mismatched: %s", val
.first
.c_str());
178 if (tag_pass
== TAG_PASS_CALCULATE_MINIMUMS
) {
179 minimum_tag_class_to_tids
[tag
.tag_class
] = tag
.tid
;
180 } else if (tag_pass
== TAG_PASS_SCRUB
&&
181 tag
.tid
< minimum_tag_class_to_tids
[tag
.tag_class
]) {
182 r
= remove_key(hctx
, val
.first
);
188 if (tag
.tid
>= minimum_tag_tid
) {
189 // no need to check for tag classes beyond this point
195 if (tag_pass
!= TAG_PASS_DONE
&& !more
) {
196 last_read
= HEADER_KEY_TAG_PREFIX
;
198 } else if (!vals
.empty()) {
199 last_read
= vals
.rbegin()->first
;
201 } while (tag_pass
!= TAG_PASS_DONE
);
205 int get_client_list_range(cls_method_context_t hctx
,
206 std::set
<cls::journal::Client
> *clients
,
207 std::string start_after
, uint64_t max_return
) {
208 std::string last_read
;
209 if (!start_after
.empty()) {
210 last_read
= key_from_client_id(start_after
);
213 std::map
<std::string
, bufferlist
> vals
;
215 int r
= cls_cxx_map_get_vals(hctx
, last_read
, HEADER_KEY_CLIENT_PREFIX
,
216 max_return
, &vals
, &more
);
218 CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r
).c_str());
222 for (std::map
<std::string
, bufferlist
>::iterator it
= vals
.begin();
223 it
!= vals
.end(); ++it
) {
225 bufferlist::iterator iter
= it
->second
.begin();
227 cls::journal::Client client
;
228 ::decode(client
, iter
);
229 clients
->insert(client
);
230 } catch (const buffer::error
&err
) {
231 CLS_ERR("could not decode client '%s': %s", it
->first
.c_str(),
240 int find_min_commit_position(cls_method_context_t hctx
,
241 cls::journal::ObjectSetPosition
*minset
) {
244 std::string start_after
= "";
245 uint64_t tag_tid
= 0, entry_tid
= 0;
248 std::set
<cls::journal::Client
> batch
;
250 r
= get_client_list_range(hctx
, &batch
, start_after
, cls::journal::JOURNAL_MAX_RETURN
);
251 if ((r
< 0) || batch
.empty()) {
255 start_after
= batch
.rbegin()->id
;
257 // update the (minimum) commit position from this batch of clients
258 for(std::set
<cls::journal::Client
>::iterator it
= batch
.begin();
259 it
!= batch
.end(); ++it
) {
260 cls::journal::ObjectSetPosition object_set_position
= (*it
).commit_position
;
261 if (object_set_position
.object_positions
.empty()) {
262 *minset
= cls::journal::ObjectSetPosition();
265 cls::journal::ObjectPosition first
= object_set_position
.object_positions
.front();
267 // least tag_tid (or least entry_tid for matching tag_tid)
268 if (!valid
|| (tag_tid
> first
.tag_tid
) || ((tag_tid
== first
.tag_tid
) && (entry_tid
> first
.entry_tid
))) {
269 tag_tid
= first
.tag_tid
;
270 entry_tid
= first
.entry_tid
;
271 *minset
= cls::journal::ObjectSetPosition(object_set_position
);
276 // got the last batch, we're done
277 if (batch
.size() < cls::journal::JOURNAL_MAX_RETURN
) {
285 } // anonymous namespace
289 * @param order (uint8_t) - bits to shift to compute the object max size
290 * @param splay width (uint8_t) - number of active journal objects
293 * @returns 0 on success, negative error code on failure
295 int journal_create(cls_method_context_t hctx
, bufferlist
*in
, bufferlist
*out
) {
300 bufferlist::iterator iter
= in
->begin();
301 ::decode(order
, iter
);
302 ::decode(splay_width
, iter
);
303 ::decode(pool_id
, iter
);
304 } catch (const buffer::error
&err
) {
305 CLS_ERR("failed to decode input parameters: %s", err
.what());
309 bufferlist stored_orderbl
;
310 int r
= cls_cxx_map_get_val(hctx
, HEADER_KEY_ORDER
, &stored_orderbl
);
312 CLS_ERR("journal already exists");
314 } else if (r
!= -ENOENT
) {
318 r
= write_key(hctx
, HEADER_KEY_ORDER
, order
);
323 r
= write_key(hctx
, HEADER_KEY_SPLAY_WIDTH
, splay_width
);
328 r
= write_key(hctx
, HEADER_KEY_POOL_ID
, pool_id
);
333 uint64_t object_set
= 0;
334 r
= write_key(hctx
, HEADER_KEY_ACTIVE_SET
, object_set
);
339 r
= write_key(hctx
, HEADER_KEY_MINIMUM_SET
, object_set
);
345 r
= write_key(hctx
, HEADER_KEY_NEXT_TAG_TID
, tag_id
);
350 r
= write_key(hctx
, HEADER_KEY_NEXT_TAG_CLASS
, tag_id
);
363 * @returns 0 on success, negative error code on failure
365 int journal_get_order(cls_method_context_t hctx
, bufferlist
*in
,
368 int r
= read_key(hctx
, HEADER_KEY_ORDER
, &order
);
373 ::encode(order
, *out
);
383 * @returns 0 on success, negative error code on failure
385 int journal_get_splay_width(cls_method_context_t hctx
, bufferlist
*in
,
388 int r
= read_key(hctx
, HEADER_KEY_SPLAY_WIDTH
, &splay_width
);
393 ::encode(splay_width
, *out
);
403 * @returns 0 on success, negative error code on failure
405 int journal_get_pool_id(cls_method_context_t hctx
, bufferlist
*in
,
408 int r
= read_key(hctx
, HEADER_KEY_POOL_ID
, &pool_id
);
413 ::encode(pool_id
, *out
);
422 * object set (uint64_t)
423 * @returns 0 on success, negative error code on failure
425 int journal_get_minimum_set(cls_method_context_t hctx
, bufferlist
*in
,
427 uint64_t minimum_set
;
428 int r
= read_key(hctx
, HEADER_KEY_MINIMUM_SET
, &minimum_set
);
433 ::encode(minimum_set
, *out
);
439 * @param object set (uint64_t)
442 * @returns 0 on success, negative error code on failure
444 int journal_set_minimum_set(cls_method_context_t hctx
, bufferlist
*in
,
448 bufferlist::iterator iter
= in
->begin();
449 ::decode(object_set
, iter
);
450 } catch (const buffer::error
&err
) {
451 CLS_ERR("failed to decode input parameters: %s", err
.what());
455 uint64_t current_active_set
;
456 int r
= read_key(hctx
, HEADER_KEY_ACTIVE_SET
, ¤t_active_set
);
461 if (current_active_set
< object_set
) {
462 CLS_ERR("active object set earlier than minimum: %" PRIu64
463 " < %" PRIu64
, current_active_set
, object_set
);
467 uint64_t current_minimum_set
;
468 r
= read_key(hctx
, HEADER_KEY_MINIMUM_SET
, ¤t_minimum_set
);
473 if (object_set
== current_minimum_set
) {
475 } else if (object_set
< current_minimum_set
) {
476 CLS_ERR("object number earlier than current object: %" PRIu64
" < %" PRIu64
,
477 object_set
, current_minimum_set
);
481 r
= write_key(hctx
, HEADER_KEY_MINIMUM_SET
, object_set
);
493 * object set (uint64_t)
494 * @returns 0 on success, negative error code on failure
496 int journal_get_active_set(cls_method_context_t hctx
, bufferlist
*in
,
499 int r
= read_key(hctx
, HEADER_KEY_ACTIVE_SET
, &active_set
);
504 ::encode(active_set
, *out
);
510 * @param object set (uint64_t)
513 * @returns 0 on success, negative error code on failure
515 int journal_set_active_set(cls_method_context_t hctx
, bufferlist
*in
,
519 bufferlist::iterator iter
= in
->begin();
520 ::decode(object_set
, iter
);
521 } catch (const buffer::error
&err
) {
522 CLS_ERR("failed to decode input parameters: %s", err
.what());
526 uint64_t current_minimum_set
;
527 int r
= read_key(hctx
, HEADER_KEY_MINIMUM_SET
, ¤t_minimum_set
);
532 if (current_minimum_set
> object_set
) {
533 CLS_ERR("minimum object set later than active: %" PRIu64
534 " > %" PRIu64
, current_minimum_set
, object_set
);
538 uint64_t current_active_set
;
539 r
= read_key(hctx
, HEADER_KEY_ACTIVE_SET
, ¤t_active_set
);
544 if (object_set
== current_active_set
) {
546 } else if (object_set
< current_active_set
) {
547 CLS_ERR("object number earlier than current object: %" PRIu64
" < %" PRIu64
,
548 object_set
, current_active_set
);
552 r
= write_key(hctx
, HEADER_KEY_ACTIVE_SET
, object_set
);
561 * @param id (string) - unique client id
564 * cls::journal::Client
565 * @returns 0 on success, negative error code on failure
567 int journal_get_client(cls_method_context_t hctx
, bufferlist
*in
,
571 bufferlist::iterator iter
= in
->begin();
573 } catch (const buffer::error
&err
) {
574 CLS_ERR("failed to decode input parameters: %s", err
.what());
578 std::string
key(key_from_client_id(id
));
579 cls::journal::Client client
;
580 int r
= read_key(hctx
, key
, &client
);
585 ::encode(client
, *out
);
591 * @param id (string) - unique client id
592 * @param data (bufferlist) - opaque data associated to client
595 * @returns 0 on success, negative error code on failure
597 int journal_client_register(cls_method_context_t hctx
, bufferlist
*in
,
602 bufferlist::iterator iter
= in
->begin();
604 ::decode(data
, iter
);
605 } catch (const buffer::error
&err
) {
606 CLS_ERR("failed to decode input parameters: %s", err
.what());
611 int r
= read_key(hctx
, HEADER_KEY_ORDER
, &order
);
616 std::string
key(key_from_client_id(id
));
617 bufferlist stored_clientbl
;
618 r
= cls_cxx_map_get_val(hctx
, key
, &stored_clientbl
);
620 CLS_ERR("duplicate client id: %s", id
.c_str());
622 } else if (r
!= -ENOENT
) {
626 cls::journal::ObjectSetPosition minset
;
627 r
= find_min_commit_position(hctx
, &minset
);
631 cls::journal::Client
client(id
, data
, minset
);
632 r
= write_key(hctx
, key
, client
);
641 * @param id (string) - unique client id
642 * @param data (bufferlist) - opaque data associated to client
645 * @returns 0 on success, negative error code on failure
647 int journal_client_update_data(cls_method_context_t hctx
, bufferlist
*in
,
652 bufferlist::iterator iter
= in
->begin();
654 ::decode(data
, iter
);
655 } catch (const buffer::error
&err
) {
656 CLS_ERR("failed to decode input parameters: %s", err
.what());
660 std::string
key(key_from_client_id(id
));
661 cls::journal::Client client
;
662 int r
= read_key(hctx
, key
, &client
);
668 r
= write_key(hctx
, key
, client
);
677 * @param id (string) - unique client id
678 * @param state (uint8_t) - client state
681 * @returns 0 on success, negative error code on failure
683 int journal_client_update_state(cls_method_context_t hctx
, bufferlist
*in
,
686 cls::journal::ClientState state
;
689 bufferlist::iterator iter
= in
->begin();
692 ::decode(state_raw
, iter
);
693 state
= static_cast<cls::journal::ClientState
>(state_raw
);
694 } catch (const buffer::error
&err
) {
695 CLS_ERR("failed to decode input parameters: %s", err
.what());
699 std::string
key(key_from_client_id(id
));
700 cls::journal::Client client
;
701 int r
= read_key(hctx
, key
, &client
);
706 client
.state
= state
;
707 r
= write_key(hctx
, key
, client
);
716 * @param id (string) - unique client id
719 * @returns 0 on success, negative error code on failure
721 int journal_client_unregister(cls_method_context_t hctx
, bufferlist
*in
,
725 bufferlist::iterator iter
= in
->begin();
727 } catch (const buffer::error
&err
) {
728 CLS_ERR("failed to decode input parameters: %s", err
.what());
732 std::string
key(key_from_client_id(id
));
734 int r
= cls_cxx_map_get_val(hctx
, key
, &bl
);
736 CLS_ERR("client is not registered: %s", id
.c_str());
740 r
= cls_cxx_map_remove_key(hctx
, key
);
742 CLS_ERR("failed to remove omap key: %s", key
.c_str());
746 // prune expired tags
747 r
= expire_tags(hctx
, &id
);
756 * @param client_id (uint64_t) - unique client id
757 * @param commit_position (ObjectSetPosition)
760 * @returns 0 on success, negative error code on failure
762 int journal_client_commit(cls_method_context_t hctx
, bufferlist
*in
,
765 cls::journal::ObjectSetPosition commit_position
;
767 bufferlist::iterator iter
= in
->begin();
769 ::decode(commit_position
, iter
);
770 } catch (const buffer::error
&err
) {
771 CLS_ERR("failed to decode input parameters: %s", err
.what());
776 int r
= read_key(hctx
, HEADER_KEY_SPLAY_WIDTH
, &splay_width
);
780 if (commit_position
.object_positions
.size() > splay_width
) {
781 CLS_ERR("too many object positions");
785 std::string
key(key_from_client_id(id
));
786 cls::journal::Client client
;
787 r
= read_key(hctx
, key
, &client
);
792 if (client
.commit_position
== commit_position
) {
796 client
.commit_position
= commit_position
;
797 r
= write_key(hctx
, key
, client
);
806 * @param start_after (string)
807 * @param max_return (uint64_t)
810 * clients (set<cls::journal::Client>) - collection of registered clients
811 * @returns 0 on success, negative error code on failure
813 int journal_client_list(cls_method_context_t hctx
, bufferlist
*in
,
815 std::string start_after
;
818 bufferlist::iterator iter
= in
->begin();
819 ::decode(start_after
, iter
);
820 ::decode(max_return
, iter
);
821 } catch (const buffer::error
&err
) {
822 CLS_ERR("failed to decode input parameters: %s", err
.what());
826 std::set
<cls::journal::Client
> clients
;
827 int r
= get_client_list_range(hctx
, &clients
, start_after
, max_return
);
831 ::encode(clients
, *out
);
840 * @returns 0 on success, negative error code on failure
842 int journal_get_next_tag_tid(cls_method_context_t hctx
, bufferlist
*in
,
845 int r
= read_key(hctx
, HEADER_KEY_NEXT_TAG_TID
, &tag_tid
);
850 ::encode(tag_tid
, *out
);
856 * @param tag_tid (uint64_t)
860 * @returns 0 on success, negative error code on failure
862 int journal_get_tag(cls_method_context_t hctx
, bufferlist
*in
,
866 bufferlist::iterator iter
= in
->begin();
867 ::decode(tag_tid
, iter
);
868 } catch (const buffer::error
&err
) {
869 CLS_ERR("failed to decode input parameters: %s", err
.what());
873 std::string
key(key_from_tag_tid(tag_tid
));
874 cls::journal::Tag tag
;
875 int r
= read_key(hctx
, key
, &tag
);
886 * @param tag_tid (uint64_t)
887 * @param tag_class (uint64_t)
888 * @param data (bufferlist)
891 * @returns 0 on success, negative error code on failure
893 int journal_tag_create(cls_method_context_t hctx
, bufferlist
*in
,
899 bufferlist::iterator iter
= in
->begin();
900 ::decode(tag_tid
, iter
);
901 ::decode(tag_class
, iter
);
902 ::decode(data
, iter
);
903 } catch (const buffer::error
&err
) {
904 CLS_ERR("failed to decode input parameters: %s", err
.what());
908 std::string
key(key_from_tag_tid(tag_tid
));
909 bufferlist stored_tag_bl
;
910 int r
= cls_cxx_map_get_val(hctx
, key
, &stored_tag_bl
);
912 CLS_ERR("duplicate tag id: %" PRIu64
, tag_tid
);
914 } else if (r
!= -ENOENT
) {
918 // verify tag tid ordering
919 uint64_t next_tag_tid
;
920 r
= read_key(hctx
, HEADER_KEY_NEXT_TAG_TID
, &next_tag_tid
);
924 if (tag_tid
!= next_tag_tid
) {
925 CLS_LOG(5, "out-of-order tag sequence: %" PRIu64
, tag_tid
);
929 uint64_t next_tag_class
;
930 r
= read_key(hctx
, HEADER_KEY_NEXT_TAG_CLASS
, &next_tag_class
);
935 if (tag_class
== cls::journal::Tag::TAG_CLASS_NEW
) {
936 // allocate a new tag class
937 tag_class
= next_tag_class
;
938 r
= write_key(hctx
, HEADER_KEY_NEXT_TAG_CLASS
, tag_class
+ 1);
943 // verify tag class range
944 if (tag_class
>= next_tag_class
) {
945 CLS_ERR("out-of-sequence tag class: %" PRIu64
, tag_class
);
950 // prune expired tags
951 r
= expire_tags(hctx
, nullptr);
956 // update tag tid sequence
957 r
= write_key(hctx
, HEADER_KEY_NEXT_TAG_TID
, tag_tid
+ 1);
962 // write tag structure
963 cls::journal::Tag
tag(tag_tid
, tag_class
, data
);
964 key
= key_from_tag_tid(tag_tid
);
965 r
= write_key(hctx
, key
, tag
);
974 * @param start_after_tag_tid (uint64_t) - first tag tid
975 * @param max_return (uint64_t) - max tags to return
976 * @param client_id (std::string) - client id filter
977 * @param tag_class (boost::optional<uint64_t> - optional tag class filter
980 * std::set<cls::journal::Tag> - collection of tags
981 * @returns 0 on success, negative error code on failure
983 int journal_tag_list(cls_method_context_t hctx
, bufferlist
*in
,
985 uint64_t start_after_tag_tid
;
987 std::string client_id
;
988 boost::optional
<uint64_t> tag_class(0);
990 // handle compiler false positive about use-before-init
991 tag_class
= boost::none
;
993 bufferlist::iterator iter
= in
->begin();
994 ::decode(start_after_tag_tid
, iter
);
995 ::decode(max_return
, iter
);
996 ::decode(client_id
, iter
);
997 ::decode(tag_class
, iter
);
998 } catch (const buffer::error
&err
) {
999 CLS_ERR("failed to decode input parameters: %s", err
.what());
1003 // calculate the minimum tag within client's commit position
1004 uint64_t minimum_tag_tid
= std::numeric_limits
<uint64_t>::max();
1005 cls::journal::Client client
;
1006 int r
= read_key(hctx
, key_from_client_id(client_id
), &client
);
1011 for (auto object_position
: client
.commit_position
.object_positions
) {
1012 minimum_tag_tid
= MIN(minimum_tag_tid
, object_position
.tag_tid
);
1015 // compute minimum tags in use per-class
1016 std::set
<cls::journal::Tag
> tags
;
1017 std::map
<uint64_t, uint64_t> minimum_tag_class_to_tids
;
1018 typedef enum { TAG_PASS_CALCULATE_MINIMUMS
,
1020 TAG_PASS_DONE
} TagPass
;
1021 int tag_pass
= (minimum_tag_tid
== std::numeric_limits
<uint64_t>::max() ?
1022 TAG_PASS_LIST
: TAG_PASS_CALCULATE_MINIMUMS
);
1023 std::string last_read
= HEADER_KEY_TAG_PREFIX
;
1025 std::map
<std::string
, bufferlist
> vals
;
1027 r
= cls_cxx_map_get_vals(hctx
, last_read
, HEADER_KEY_TAG_PREFIX
,
1028 MAX_KEYS_READ
, &vals
, &more
);
1029 if (r
< 0 && r
!= -ENOENT
) {
1030 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r
).c_str());
1034 for (auto &val
: vals
) {
1035 cls::journal::Tag tag
;
1036 bufferlist::iterator iter
= val
.second
.begin();
1038 ::decode(tag
, iter
);
1039 } catch (const buffer::error
&err
) {
1040 CLS_ERR("error decoding tag: %s", val
.first
.c_str());
1044 if (tag_pass
== TAG_PASS_CALCULATE_MINIMUMS
) {
1045 minimum_tag_class_to_tids
[tag
.tag_class
] = tag
.tid
;
1047 // completed calculation of tag class minimums
1048 if (tag
.tid
>= minimum_tag_tid
) {
1052 } else if (tag_pass
== TAG_PASS_LIST
) {
1053 if (start_after_tag_tid
!= 0 && tag
.tid
<= start_after_tag_tid
) {
1057 if (tag
.tid
>= minimum_tag_class_to_tids
[tag
.tag_class
] &&
1058 (!tag_class
|| *tag_class
== tag
.tag_class
)) {
1061 if (tags
.size() >= max_return
) {
1062 tag_pass
= TAG_PASS_DONE
;
1067 if (tag_pass
!= TAG_PASS_DONE
&& !more
) {
1068 last_read
= HEADER_KEY_TAG_PREFIX
;
1070 } else if (!vals
.empty()) {
1071 last_read
= vals
.rbegin()->first
;
1073 } while (tag_pass
!= TAG_PASS_DONE
);
1075 ::encode(tags
, *out
);
1081 * @param soft_max_size (uint64_t)
1084 * @returns 0 if object size less than max, negative error code otherwise
1086 int journal_object_guard_append(cls_method_context_t hctx
, bufferlist
*in
,
1088 uint64_t soft_max_size
;
1090 bufferlist::iterator iter
= in
->begin();
1091 ::decode(soft_max_size
, iter
);
1092 } catch (const buffer::error
&err
) {
1093 CLS_ERR("failed to decode input parameters: %s", err
.what());
1099 int r
= cls_cxx_stat(hctx
, &size
, &mtime
);
1103 CLS_ERR("failed to stat object: %s", cpp_strerror(r
).c_str());
1107 if (size
>= soft_max_size
) {
1108 CLS_LOG(5, "journal object full: %" PRIu64
" >= %" PRIu64
,
1109 size
, soft_max_size
);
1117 CLS_LOG(20, "Loaded journal class!");
1119 cls_handle_t h_class
;
1120 cls_method_handle_t h_journal_create
;
1121 cls_method_handle_t h_journal_get_order
;
1122 cls_method_handle_t h_journal_get_splay_width
;
1123 cls_method_handle_t h_journal_get_pool_id
;
1124 cls_method_handle_t h_journal_get_minimum_set
;
1125 cls_method_handle_t h_journal_set_minimum_set
;
1126 cls_method_handle_t h_journal_get_active_set
;
1127 cls_method_handle_t h_journal_set_active_set
;
1128 cls_method_handle_t h_journal_get_client
;
1129 cls_method_handle_t h_journal_client_register
;
1130 cls_method_handle_t h_journal_client_update_data
;
1131 cls_method_handle_t h_journal_client_update_state
;
1132 cls_method_handle_t h_journal_client_unregister
;
1133 cls_method_handle_t h_journal_client_commit
;
1134 cls_method_handle_t h_journal_client_list
;
1135 cls_method_handle_t h_journal_get_next_tag_tid
;
1136 cls_method_handle_t h_journal_get_tag
;
1137 cls_method_handle_t h_journal_tag_create
;
1138 cls_method_handle_t h_journal_tag_list
;
1139 cls_method_handle_t h_journal_object_guard_append
;
1141 cls_register("journal", &h_class
);
1143 /// methods for journal.$journal_id objects
1144 cls_register_cxx_method(h_class
, "create",
1145 CLS_METHOD_RD
| CLS_METHOD_WR
,
1146 journal_create
, &h_journal_create
);
1147 cls_register_cxx_method(h_class
, "get_order",
1149 journal_get_order
, &h_journal_get_order
);
1150 cls_register_cxx_method(h_class
, "get_splay_width",
1152 journal_get_splay_width
, &h_journal_get_splay_width
);
1153 cls_register_cxx_method(h_class
, "get_pool_id",
1155 journal_get_pool_id
, &h_journal_get_pool_id
);
1156 cls_register_cxx_method(h_class
, "get_minimum_set",
1158 journal_get_minimum_set
,
1159 &h_journal_get_minimum_set
);
1160 cls_register_cxx_method(h_class
, "set_minimum_set",
1161 CLS_METHOD_RD
| CLS_METHOD_WR
,
1162 journal_set_minimum_set
,
1163 &h_journal_set_minimum_set
);
1164 cls_register_cxx_method(h_class
, "get_active_set",
1166 journal_get_active_set
,
1167 &h_journal_get_active_set
);
1168 cls_register_cxx_method(h_class
, "set_active_set",
1169 CLS_METHOD_RD
| CLS_METHOD_WR
,
1170 journal_set_active_set
,
1171 &h_journal_set_active_set
);
1173 cls_register_cxx_method(h_class
, "get_client",
1175 journal_get_client
, &h_journal_get_client
);
1176 cls_register_cxx_method(h_class
, "client_register",
1177 CLS_METHOD_RD
| CLS_METHOD_WR
,
1178 journal_client_register
, &h_journal_client_register
);
1179 cls_register_cxx_method(h_class
, "client_update_data",
1180 CLS_METHOD_RD
| CLS_METHOD_WR
,
1181 journal_client_update_data
,
1182 &h_journal_client_update_data
);
1183 cls_register_cxx_method(h_class
, "client_update_state",
1184 CLS_METHOD_RD
| CLS_METHOD_WR
,
1185 journal_client_update_state
,
1186 &h_journal_client_update_state
);
1187 cls_register_cxx_method(h_class
, "client_unregister",
1188 CLS_METHOD_RD
| CLS_METHOD_WR
,
1189 journal_client_unregister
,
1190 &h_journal_client_unregister
);
1191 cls_register_cxx_method(h_class
, "client_commit",
1192 CLS_METHOD_RD
| CLS_METHOD_WR
,
1193 journal_client_commit
, &h_journal_client_commit
);
1194 cls_register_cxx_method(h_class
, "client_list",
1196 journal_client_list
, &h_journal_client_list
);
1198 cls_register_cxx_method(h_class
, "get_next_tag_tid",
1200 journal_get_next_tag_tid
,
1201 &h_journal_get_next_tag_tid
);
1202 cls_register_cxx_method(h_class
, "get_tag",
1204 journal_get_tag
, &h_journal_get_tag
);
1205 cls_register_cxx_method(h_class
, "tag_create",
1206 CLS_METHOD_RD
| CLS_METHOD_WR
,
1207 journal_tag_create
, &h_journal_tag_create
);
1208 cls_register_cxx_method(h_class
, "tag_list",
1210 journal_tag_list
, &h_journal_tag_list
);
1212 /// methods for journal_data.$journal_id.$object_id objects
1213 cls_register_cxx_method(h_class
, "guard_append",
1214 CLS_METHOD_RD
| CLS_METHOD_WR
,
1215 journal_object_guard_append
,
1216 &h_journal_object_guard_append
);