1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/int_types.h"
5 #include "include/buffer.h"
6 #include "include/encoding.h"
7 #include "common/errno.h"
8 #include "objclass/objclass.h"
9 #include "cls/journal/cls_journal_types.h"
20 static const uint64_t MAX_KEYS_READ
= 64;
22 static const std::string HEADER_KEY_ORDER
= "order";
23 static const std::string HEADER_KEY_SPLAY_WIDTH
= "splay_width";
24 static const std::string HEADER_KEY_POOL_ID
= "pool_id";
25 static const std::string HEADER_KEY_MINIMUM_SET
= "minimum_set";
26 static const std::string HEADER_KEY_ACTIVE_SET
= "active_set";
27 static const std::string HEADER_KEY_NEXT_TAG_TID
= "next_tag_tid";
28 static const std::string HEADER_KEY_NEXT_TAG_CLASS
= "next_tag_class";
29 static const std::string HEADER_KEY_CLIENT_PREFIX
= "client_";
30 static const std::string HEADER_KEY_TAG_PREFIX
= "tag_";
32 std::string
to_hex(uint64_t value
) {
33 std::ostringstream oss
;
34 oss
<< std::setw(16) << std::setfill('0') << std::hex
<< value
;
38 std::string
key_from_client_id(const std::string
&client_id
) {
39 return HEADER_KEY_CLIENT_PREFIX
+ client_id
;
42 std::string
key_from_tag_tid(uint64_t tag_tid
) {
43 return HEADER_KEY_TAG_PREFIX
+ to_hex(tag_tid
);
46 uint64_t tag_tid_from_key(const std::string
&key
) {
47 std::istringstream
iss(key
);
49 iss
.ignore(HEADER_KEY_TAG_PREFIX
.size()) >> std::hex
>> id
;
54 int read_key(cls_method_context_t hctx
, const string
&key
, T
*t
,
55 bool ignore_enoent
= false) {
57 int r
= cls_cxx_map_get_val(hctx
, key
, &bl
);
64 CLS_ERR("failed to get omap key: %s", key
.c_str());
69 auto iter
= bl
.cbegin();
71 } catch (const buffer::error
&err
) {
72 CLS_ERR("failed to decode input parameters: %s", err
.what());
79 int write_key(cls_method_context_t hctx
, const string
&key
, const T
&t
) {
83 int r
= cls_cxx_map_set_val(hctx
, key
, &bl
);
85 CLS_ERR("failed to set omap key: %s", key
.c_str());
91 int remove_key(cls_method_context_t hctx
, const string
&key
) {
92 int r
= cls_cxx_map_remove_key(hctx
, key
);
93 if (r
< 0 && r
!= -ENOENT
) {
94 CLS_ERR("failed to remove key: %s", key
.c_str());
100 int expire_tags(cls_method_context_t hctx
, const std::string
*skip_client_id
) {
102 std::string skip_client_key
;
103 if (skip_client_id
!= nullptr) {
104 skip_client_key
= key_from_client_id(*skip_client_id
);
107 uint64_t minimum_tag_tid
= std::numeric_limits
<uint64_t>::max();
108 std::string last_read
= "";
111 std::map
<std::string
, bufferlist
> vals
;
112 int r
= cls_cxx_map_get_vals(hctx
, last_read
, HEADER_KEY_CLIENT_PREFIX
,
113 MAX_KEYS_READ
, &vals
, &more
);
114 if (r
< 0 && r
!= -ENOENT
) {
115 CLS_ERR("failed to retrieve registered clients: %s",
116 cpp_strerror(r
).c_str());
120 for (auto &val
: vals
) {
121 // if we are removing a client, skip its commit positions
122 if (val
.first
== skip_client_key
) {
126 cls::journal::Client client
;
127 auto iter
= val
.second
.cbegin();
129 decode(client
, iter
);
130 } catch (const buffer::error
&err
) {
131 CLS_ERR("error decoding registered client: %s",
136 if (client
.state
== cls::journal::CLIENT_STATE_DISCONNECTED
) {
137 // don't allow a disconnected client to prevent pruning
139 } else if (client
.commit_position
.object_positions
.empty()) {
140 // cannot prune if one or more clients has an empty commit history
144 for (auto object_position
: client
.commit_position
.object_positions
) {
145 minimum_tag_tid
= std::min(minimum_tag_tid
, object_position
.tag_tid
);
149 last_read
= vals
.rbegin()->first
;
153 // cannot expire tags if a client hasn't committed yet
154 if (minimum_tag_tid
== std::numeric_limits
<uint64_t>::max()) {
158 // compute the minimum in-use tag for each class
159 std::map
<uint64_t, uint64_t> minimum_tag_class_to_tids
;
160 typedef enum { TAG_PASS_CALCULATE_MINIMUMS
,
162 TAG_PASS_DONE
} TagPass
;
163 int tag_pass
= TAG_PASS_CALCULATE_MINIMUMS
;
164 last_read
= HEADER_KEY_TAG_PREFIX
;
166 std::map
<std::string
, bufferlist
> vals
;
167 int r
= cls_cxx_map_get_vals(hctx
, last_read
, HEADER_KEY_TAG_PREFIX
,
168 MAX_KEYS_READ
, &vals
, &more
);
169 if (r
< 0 && r
!= -ENOENT
) {
170 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r
).c_str());
174 for (auto &val
: vals
) {
175 cls::journal::Tag tag
;
176 auto iter
= val
.second
.cbegin();
179 } catch (const buffer::error
&err
) {
180 CLS_ERR("error decoding tag: %s", val
.first
.c_str());
184 if (tag
.tid
!= tag_tid_from_key(val
.first
)) {
185 CLS_ERR("tag tid mismatched: %s", val
.first
.c_str());
189 if (tag_pass
== TAG_PASS_CALCULATE_MINIMUMS
) {
190 minimum_tag_class_to_tids
[tag
.tag_class
] = tag
.tid
;
191 } else if (tag_pass
== TAG_PASS_SCRUB
&&
192 tag
.tid
< minimum_tag_class_to_tids
[tag
.tag_class
]) {
193 r
= remove_key(hctx
, val
.first
);
199 if (tag
.tid
>= minimum_tag_tid
) {
200 // no need to check for tag classes beyond this point
207 if (tag_pass
!= TAG_PASS_DONE
&& !more
) {
208 last_read
= HEADER_KEY_TAG_PREFIX
;
210 } else if (!vals
.empty()) {
211 last_read
= vals
.rbegin()->first
;
213 } while (tag_pass
!= TAG_PASS_DONE
);
217 int get_client_list_range(cls_method_context_t hctx
,
218 std::set
<cls::journal::Client
> *clients
,
219 std::string start_after
, uint64_t max_return
) {
220 std::string last_read
;
221 if (!start_after
.empty()) {
222 last_read
= key_from_client_id(start_after
);
225 std::map
<std::string
, bufferlist
> vals
;
227 int r
= cls_cxx_map_get_vals(hctx
, last_read
, HEADER_KEY_CLIENT_PREFIX
,
228 max_return
, &vals
, &more
);
230 CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r
).c_str());
234 for (std::map
<std::string
, bufferlist
>::iterator it
= vals
.begin();
235 it
!= vals
.end(); ++it
) {
237 auto iter
= it
->second
.cbegin();
239 cls::journal::Client client
;
240 decode(client
, iter
);
241 clients
->insert(client
);
242 } catch (const buffer::error
&err
) {
243 CLS_ERR("could not decode client '%s': %s", it
->first
.c_str(),
252 int find_min_commit_position(cls_method_context_t hctx
,
253 cls::journal::ObjectSetPosition
*minset
) {
256 std::string start_after
= "";
257 uint64_t tag_tid
= 0, entry_tid
= 0;
260 std::set
<cls::journal::Client
> batch
;
262 r
= get_client_list_range(hctx
, &batch
, start_after
, cls::journal::JOURNAL_MAX_RETURN
);
263 if ((r
< 0) || batch
.empty()) {
267 start_after
= batch
.rbegin()->id
;
269 // update the (minimum) commit position from this batch of clients
270 for(std::set
<cls::journal::Client
>::iterator it
= batch
.begin();
271 it
!= batch
.end(); ++it
) {
272 cls::journal::ObjectSetPosition object_set_position
= (*it
).commit_position
;
273 if (object_set_position
.object_positions
.empty()) {
274 *minset
= cls::journal::ObjectSetPosition();
277 cls::journal::ObjectPosition first
= object_set_position
.object_positions
.front();
279 // least tag_tid (or least entry_tid for matching tag_tid)
280 if (!valid
|| (tag_tid
> first
.tag_tid
) || ((tag_tid
== first
.tag_tid
) && (entry_tid
> first
.entry_tid
))) {
281 tag_tid
= first
.tag_tid
;
282 entry_tid
= first
.entry_tid
;
283 *minset
= cls::journal::ObjectSetPosition(object_set_position
);
288 // got the last batch, we're done
289 if (batch
.size() < cls::journal::JOURNAL_MAX_RETURN
) {
297 } // anonymous namespace
301 * @param order (uint8_t) - bits to shift to compute the object max size
302 * @param splay width (uint8_t) - number of active journal objects
305 * @returns 0 on success, negative error code on failure
307 int journal_create(cls_method_context_t hctx
, bufferlist
*in
, bufferlist
*out
) {
312 auto iter
= in
->cbegin();
314 decode(splay_width
, iter
);
315 decode(pool_id
, iter
);
316 } catch (const buffer::error
&err
) {
317 CLS_ERR("failed to decode input parameters: %s", err
.what());
321 bufferlist stored_orderbl
;
322 int r
= cls_cxx_map_get_val(hctx
, HEADER_KEY_ORDER
, &stored_orderbl
);
324 CLS_ERR("journal already exists");
326 } else if (r
!= -ENOENT
) {
330 r
= write_key(hctx
, HEADER_KEY_ORDER
, order
);
335 r
= write_key(hctx
, HEADER_KEY_SPLAY_WIDTH
, splay_width
);
340 r
= write_key(hctx
, HEADER_KEY_POOL_ID
, pool_id
);
345 uint64_t object_set
= 0;
346 r
= write_key(hctx
, HEADER_KEY_ACTIVE_SET
, object_set
);
351 r
= write_key(hctx
, HEADER_KEY_MINIMUM_SET
, object_set
);
357 r
= write_key(hctx
, HEADER_KEY_NEXT_TAG_TID
, tag_id
);
362 r
= write_key(hctx
, HEADER_KEY_NEXT_TAG_CLASS
, tag_id
);
375 * @returns 0 on success, negative error code on failure
377 int journal_get_order(cls_method_context_t hctx
, bufferlist
*in
,
380 int r
= read_key(hctx
, HEADER_KEY_ORDER
, &order
);
394 * splay_width (uint8_t)
395 * @returns 0 on success, negative error code on failure
397 int journal_get_splay_width(cls_method_context_t hctx
, bufferlist
*in
,
400 int r
= read_key(hctx
, HEADER_KEY_SPLAY_WIDTH
, &splay_width
);
405 encode(splay_width
, *out
);
415 * @returns 0 on success, negative error code on failure
417 int journal_get_pool_id(cls_method_context_t hctx
, bufferlist
*in
,
420 int r
= read_key(hctx
, HEADER_KEY_POOL_ID
, &pool_id
);
425 encode(pool_id
, *out
);
434 * object set (uint64_t)
435 * @returns 0 on success, negative error code on failure
437 int journal_get_minimum_set(cls_method_context_t hctx
, bufferlist
*in
,
439 uint64_t minimum_set
;
440 int r
= read_key(hctx
, HEADER_KEY_MINIMUM_SET
, &minimum_set
);
445 encode(minimum_set
, *out
);
451 * @param object set (uint64_t)
454 * @returns 0 on success, negative error code on failure
456 int journal_set_minimum_set(cls_method_context_t hctx
, bufferlist
*in
,
460 auto iter
= in
->cbegin();
461 decode(object_set
, iter
);
462 } catch (const buffer::error
&err
) {
463 CLS_ERR("failed to decode input parameters: %s", err
.what());
467 uint64_t current_active_set
;
468 int r
= read_key(hctx
, HEADER_KEY_ACTIVE_SET
, ¤t_active_set
);
473 if (current_active_set
< object_set
) {
474 CLS_LOG(10, "active object set earlier than minimum: %" PRIu64
475 " < %" PRIu64
, current_active_set
, object_set
);
479 uint64_t current_minimum_set
;
480 r
= read_key(hctx
, HEADER_KEY_MINIMUM_SET
, ¤t_minimum_set
);
485 if (object_set
== current_minimum_set
) {
487 } else if (object_set
< current_minimum_set
) {
488 CLS_ERR("object number earlier than current object: %" PRIu64
" < %" PRIu64
,
489 object_set
, current_minimum_set
);
493 r
= write_key(hctx
, HEADER_KEY_MINIMUM_SET
, object_set
);
505 * object set (uint64_t)
506 * @returns 0 on success, negative error code on failure
508 int journal_get_active_set(cls_method_context_t hctx
, bufferlist
*in
,
511 int r
= read_key(hctx
, HEADER_KEY_ACTIVE_SET
, &active_set
);
516 encode(active_set
, *out
);
522 * @param object set (uint64_t)
525 * @returns 0 on success, negative error code on failure
527 int journal_set_active_set(cls_method_context_t hctx
, bufferlist
*in
,
531 auto iter
= in
->cbegin();
532 decode(object_set
, iter
);
533 } catch (const buffer::error
&err
) {
534 CLS_ERR("failed to decode input parameters: %s", err
.what());
538 uint64_t current_minimum_set
;
539 int r
= read_key(hctx
, HEADER_KEY_MINIMUM_SET
, ¤t_minimum_set
);
544 if (current_minimum_set
> object_set
) {
545 CLS_ERR("minimum object set later than active: %" PRIu64
546 " > %" PRIu64
, current_minimum_set
, object_set
);
550 uint64_t current_active_set
;
551 r
= read_key(hctx
, HEADER_KEY_ACTIVE_SET
, ¤t_active_set
);
556 if (object_set
== current_active_set
) {
558 } else if (object_set
< current_active_set
) {
559 CLS_ERR("object number earlier than current object: %" PRIu64
" < %" PRIu64
,
560 object_set
, current_active_set
);
564 r
= write_key(hctx
, HEADER_KEY_ACTIVE_SET
, object_set
);
573 * @param id (string) - unique client id
576 * cls::journal::Client
577 * @returns 0 on success, negative error code on failure
579 int journal_get_client(cls_method_context_t hctx
, bufferlist
*in
,
583 auto iter
= in
->cbegin();
585 } catch (const buffer::error
&err
) {
586 CLS_ERR("failed to decode input parameters: %s", err
.what());
590 std::string
key(key_from_client_id(id
));
591 cls::journal::Client client
;
592 int r
= read_key(hctx
, key
, &client
);
597 encode(client
, *out
);
603 * @param id (string) - unique client id
604 * @param data (bufferlist) - opaque data associated to client
607 * @returns 0 on success, negative error code on failure
609 int journal_client_register(cls_method_context_t hctx
, bufferlist
*in
,
614 auto iter
= in
->cbegin();
617 } catch (const buffer::error
&err
) {
618 CLS_ERR("failed to decode input parameters: %s", err
.what());
623 int r
= read_key(hctx
, HEADER_KEY_ORDER
, &order
);
628 std::string
key(key_from_client_id(id
));
629 bufferlist stored_clientbl
;
630 r
= cls_cxx_map_get_val(hctx
, key
, &stored_clientbl
);
632 CLS_ERR("duplicate client id: %s", id
.c_str());
634 } else if (r
!= -ENOENT
) {
638 cls::journal::ObjectSetPosition minset
;
639 r
= find_min_commit_position(hctx
, &minset
);
643 cls::journal::Client
client(id
, data
, minset
);
644 r
= write_key(hctx
, key
, client
);
653 * @param id (string) - unique client id
654 * @param data (bufferlist) - opaque data associated to client
657 * @returns 0 on success, negative error code on failure
659 int journal_client_update_data(cls_method_context_t hctx
, bufferlist
*in
,
664 auto iter
= in
->cbegin();
667 } catch (const buffer::error
&err
) {
668 CLS_ERR("failed to decode input parameters: %s", err
.what());
672 std::string
key(key_from_client_id(id
));
673 cls::journal::Client client
;
674 int r
= read_key(hctx
, key
, &client
);
680 r
= write_key(hctx
, key
, client
);
689 * @param id (string) - unique client id
690 * @param state (uint8_t) - client state
693 * @returns 0 on success, negative error code on failure
695 int journal_client_update_state(cls_method_context_t hctx
, bufferlist
*in
,
698 cls::journal::ClientState state
;
701 auto iter
= in
->cbegin();
704 decode(state_raw
, iter
);
705 state
= static_cast<cls::journal::ClientState
>(state_raw
);
706 } catch (const buffer::error
&err
) {
707 CLS_ERR("failed to decode input parameters: %s", err
.what());
711 std::string
key(key_from_client_id(id
));
712 cls::journal::Client client
;
713 int r
= read_key(hctx
, key
, &client
);
718 client
.state
= state
;
719 r
= write_key(hctx
, key
, client
);
728 * @param id (string) - unique client id
731 * @returns 0 on success, negative error code on failure
733 int journal_client_unregister(cls_method_context_t hctx
, bufferlist
*in
,
737 auto iter
= in
->cbegin();
739 } catch (const buffer::error
&err
) {
740 CLS_ERR("failed to decode input parameters: %s", err
.what());
744 std::string
key(key_from_client_id(id
));
746 int r
= cls_cxx_map_get_val(hctx
, key
, &bl
);
748 CLS_ERR("client is not registered: %s", id
.c_str());
752 r
= cls_cxx_map_remove_key(hctx
, key
);
754 CLS_ERR("failed to remove omap key: %s", key
.c_str());
758 // prune expired tags
759 r
= expire_tags(hctx
, &id
);
768 * @param client_id (uint64_t) - unique client id
769 * @param commit_position (ObjectSetPosition)
772 * @returns 0 on success, negative error code on failure
774 int journal_client_commit(cls_method_context_t hctx
, bufferlist
*in
,
777 cls::journal::ObjectSetPosition commit_position
;
779 auto iter
= in
->cbegin();
781 decode(commit_position
, iter
);
782 } catch (const buffer::error
&err
) {
783 CLS_ERR("failed to decode input parameters: %s", err
.what());
788 int r
= read_key(hctx
, HEADER_KEY_SPLAY_WIDTH
, &splay_width
);
792 if (commit_position
.object_positions
.size() > splay_width
) {
793 CLS_ERR("too many object positions");
797 std::string
key(key_from_client_id(id
));
798 cls::journal::Client client
;
799 r
= read_key(hctx
, key
, &client
);
804 if (client
.commit_position
== commit_position
) {
808 client
.commit_position
= commit_position
;
809 r
= write_key(hctx
, key
, client
);
818 * @param start_after (string)
819 * @param max_return (uint64_t)
822 * clients (set<cls::journal::Client>) - collection of registered clients
823 * @returns 0 on success, negative error code on failure
825 int journal_client_list(cls_method_context_t hctx
, bufferlist
*in
,
827 std::string start_after
;
830 auto iter
= in
->cbegin();
831 decode(start_after
, iter
);
832 decode(max_return
, iter
);
833 } catch (const buffer::error
&err
) {
834 CLS_ERR("failed to decode input parameters: %s", err
.what());
838 std::set
<cls::journal::Client
> clients
;
839 int r
= get_client_list_range(hctx
, &clients
, start_after
, max_return
);
843 encode(clients
, *out
);
852 * @returns 0 on success, negative error code on failure
854 int journal_get_next_tag_tid(cls_method_context_t hctx
, bufferlist
*in
,
857 int r
= read_key(hctx
, HEADER_KEY_NEXT_TAG_TID
, &tag_tid
);
862 encode(tag_tid
, *out
);
868 * @param tag_tid (uint64_t)
872 * @returns 0 on success, negative error code on failure
874 int journal_get_tag(cls_method_context_t hctx
, bufferlist
*in
,
878 auto iter
= in
->cbegin();
879 decode(tag_tid
, iter
);
880 } catch (const buffer::error
&err
) {
881 CLS_ERR("failed to decode input parameters: %s", err
.what());
885 std::string
key(key_from_tag_tid(tag_tid
));
886 cls::journal::Tag tag
;
887 int r
= read_key(hctx
, key
, &tag
);
898 * @param tag_tid (uint64_t)
899 * @param tag_class (uint64_t)
900 * @param data (bufferlist)
903 * @returns 0 on success, negative error code on failure
905 int journal_tag_create(cls_method_context_t hctx
, bufferlist
*in
,
911 auto iter
= in
->cbegin();
912 decode(tag_tid
, iter
);
913 decode(tag_class
, iter
);
915 } catch (const buffer::error
&err
) {
916 CLS_ERR("failed to decode input parameters: %s", err
.what());
920 std::string
key(key_from_tag_tid(tag_tid
));
921 bufferlist stored_tag_bl
;
922 int r
= cls_cxx_map_get_val(hctx
, key
, &stored_tag_bl
);
924 CLS_ERR("duplicate tag id: %" PRIu64
, tag_tid
);
926 } else if (r
!= -ENOENT
) {
930 // verify tag tid ordering
931 uint64_t next_tag_tid
;
932 r
= read_key(hctx
, HEADER_KEY_NEXT_TAG_TID
, &next_tag_tid
);
936 if (tag_tid
!= next_tag_tid
) {
937 CLS_LOG(5, "out-of-order tag sequence: %" PRIu64
, tag_tid
);
941 uint64_t next_tag_class
;
942 r
= read_key(hctx
, HEADER_KEY_NEXT_TAG_CLASS
, &next_tag_class
);
947 if (tag_class
== cls::journal::Tag::TAG_CLASS_NEW
) {
948 // allocate a new tag class
949 tag_class
= next_tag_class
;
950 r
= write_key(hctx
, HEADER_KEY_NEXT_TAG_CLASS
, tag_class
+ 1);
955 // verify tag class range
956 if (tag_class
>= next_tag_class
) {
957 CLS_ERR("out-of-sequence tag class: %" PRIu64
, tag_class
);
962 // prune expired tags
963 r
= expire_tags(hctx
, nullptr);
968 // update tag tid sequence
969 r
= write_key(hctx
, HEADER_KEY_NEXT_TAG_TID
, tag_tid
+ 1);
974 // write tag structure
975 cls::journal::Tag
tag(tag_tid
, tag_class
, data
);
976 key
= key_from_tag_tid(tag_tid
);
977 r
= write_key(hctx
, key
, tag
);
986 * @param start_after_tag_tid (uint64_t) - first tag tid
987 * @param max_return (uint64_t) - max tags to return
988 * @param client_id (std::string) - client id filter
989 * @param tag_class (boost::optional<uint64_t> - optional tag class filter
992 * std::set<cls::journal::Tag> - collection of tags
993 * @returns 0 on success, negative error code on failure
995 int journal_tag_list(cls_method_context_t hctx
, bufferlist
*in
,
997 uint64_t start_after_tag_tid
;
999 std::string client_id
;
1000 boost::optional
<uint64_t> tag_class(0);
1002 // handle compiler false positive about use-before-init
1003 tag_class
= boost::none
;
1005 auto iter
= in
->cbegin();
1006 decode(start_after_tag_tid
, iter
);
1007 decode(max_return
, iter
);
1008 decode(client_id
, iter
);
1009 decode(tag_class
, iter
);
1010 } catch (const buffer::error
&err
) {
1011 CLS_ERR("failed to decode input parameters: %s", err
.what());
1015 // calculate the minimum tag within client's commit position
1016 uint64_t minimum_tag_tid
= std::numeric_limits
<uint64_t>::max();
1017 cls::journal::Client client
;
1018 int r
= read_key(hctx
, key_from_client_id(client_id
), &client
);
1023 for (auto object_position
: client
.commit_position
.object_positions
) {
1024 minimum_tag_tid
= std::min(minimum_tag_tid
, object_position
.tag_tid
);
1027 // compute minimum tags in use per-class
1028 std::set
<cls::journal::Tag
> tags
;
1029 std::map
<uint64_t, uint64_t> minimum_tag_class_to_tids
;
1030 typedef enum { TAG_PASS_CALCULATE_MINIMUMS
,
1032 TAG_PASS_DONE
} TagPass
;
1033 int tag_pass
= (minimum_tag_tid
== std::numeric_limits
<uint64_t>::max() ?
1034 TAG_PASS_LIST
: TAG_PASS_CALCULATE_MINIMUMS
);
1035 std::string last_read
= HEADER_KEY_TAG_PREFIX
;
1037 std::map
<std::string
, bufferlist
> vals
;
1039 r
= cls_cxx_map_get_vals(hctx
, last_read
, HEADER_KEY_TAG_PREFIX
,
1040 MAX_KEYS_READ
, &vals
, &more
);
1041 if (r
< 0 && r
!= -ENOENT
) {
1042 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r
).c_str());
1046 for (auto &val
: vals
) {
1047 cls::journal::Tag tag
;
1048 auto iter
= val
.second
.cbegin();
1051 } catch (const buffer::error
&err
) {
1052 CLS_ERR("error decoding tag: %s", val
.first
.c_str());
1056 if (tag_pass
== TAG_PASS_CALCULATE_MINIMUMS
) {
1057 minimum_tag_class_to_tids
[tag
.tag_class
] = tag
.tid
;
1059 // completed calculation of tag class minimums
1060 if (tag
.tid
>= minimum_tag_tid
) {
1065 } else if (tag_pass
== TAG_PASS_LIST
) {
1066 if (start_after_tag_tid
!= 0 && tag
.tid
<= start_after_tag_tid
) {
1070 if (tag
.tid
>= minimum_tag_class_to_tids
[tag
.tag_class
] &&
1071 (!tag_class
|| *tag_class
== tag
.tag_class
)) {
1074 if (tags
.size() >= max_return
) {
1075 tag_pass
= TAG_PASS_DONE
;
1080 if (tag_pass
!= TAG_PASS_DONE
&& !more
) {
1081 last_read
= HEADER_KEY_TAG_PREFIX
;
1083 } else if (!vals
.empty()) {
1084 last_read
= vals
.rbegin()->first
;
1086 } while (tag_pass
!= TAG_PASS_DONE
);
1094 * @param soft_max_size (uint64_t)
1097 * @returns 0 if object size less than max, negative error code otherwise
1099 int journal_object_guard_append(cls_method_context_t hctx
, bufferlist
*in
,
1101 uint64_t soft_max_size
;
1103 auto iter
= in
->cbegin();
1104 decode(soft_max_size
, iter
);
1105 } catch (const buffer::error
&err
) {
1106 CLS_ERR("failed to decode input parameters: %s", err
.what());
1112 int r
= cls_cxx_stat(hctx
, &size
, &mtime
);
1116 CLS_ERR("failed to stat object: %s", cpp_strerror(r
).c_str());
1120 if (size
>= soft_max_size
) {
1121 CLS_LOG(5, "journal object full: %" PRIu64
" >= %" PRIu64
,
1122 size
, soft_max_size
);
1130 CLS_LOG(20, "Loaded journal class!");
1132 cls_handle_t h_class
;
1133 cls_method_handle_t h_journal_create
;
1134 cls_method_handle_t h_journal_get_order
;
1135 cls_method_handle_t h_journal_get_splay_width
;
1136 cls_method_handle_t h_journal_get_pool_id
;
1137 cls_method_handle_t h_journal_get_minimum_set
;
1138 cls_method_handle_t h_journal_set_minimum_set
;
1139 cls_method_handle_t h_journal_get_active_set
;
1140 cls_method_handle_t h_journal_set_active_set
;
1141 cls_method_handle_t h_journal_get_client
;
1142 cls_method_handle_t h_journal_client_register
;
1143 cls_method_handle_t h_journal_client_update_data
;
1144 cls_method_handle_t h_journal_client_update_state
;
1145 cls_method_handle_t h_journal_client_unregister
;
1146 cls_method_handle_t h_journal_client_commit
;
1147 cls_method_handle_t h_journal_client_list
;
1148 cls_method_handle_t h_journal_get_next_tag_tid
;
1149 cls_method_handle_t h_journal_get_tag
;
1150 cls_method_handle_t h_journal_tag_create
;
1151 cls_method_handle_t h_journal_tag_list
;
1152 cls_method_handle_t h_journal_object_guard_append
;
1154 cls_register("journal", &h_class
);
1156 /// methods for journal.$journal_id objects
1157 cls_register_cxx_method(h_class
, "create",
1158 CLS_METHOD_RD
| CLS_METHOD_WR
,
1159 journal_create
, &h_journal_create
);
1160 cls_register_cxx_method(h_class
, "get_order",
1162 journal_get_order
, &h_journal_get_order
);
1163 cls_register_cxx_method(h_class
, "get_splay_width",
1165 journal_get_splay_width
, &h_journal_get_splay_width
);
1166 cls_register_cxx_method(h_class
, "get_pool_id",
1168 journal_get_pool_id
, &h_journal_get_pool_id
);
1169 cls_register_cxx_method(h_class
, "get_minimum_set",
1171 journal_get_minimum_set
,
1172 &h_journal_get_minimum_set
);
1173 cls_register_cxx_method(h_class
, "set_minimum_set",
1174 CLS_METHOD_RD
| CLS_METHOD_WR
,
1175 journal_set_minimum_set
,
1176 &h_journal_set_minimum_set
);
1177 cls_register_cxx_method(h_class
, "get_active_set",
1179 journal_get_active_set
,
1180 &h_journal_get_active_set
);
1181 cls_register_cxx_method(h_class
, "set_active_set",
1182 CLS_METHOD_RD
| CLS_METHOD_WR
,
1183 journal_set_active_set
,
1184 &h_journal_set_active_set
);
1186 cls_register_cxx_method(h_class
, "get_client",
1188 journal_get_client
, &h_journal_get_client
);
1189 cls_register_cxx_method(h_class
, "client_register",
1190 CLS_METHOD_RD
| CLS_METHOD_WR
,
1191 journal_client_register
, &h_journal_client_register
);
1192 cls_register_cxx_method(h_class
, "client_update_data",
1193 CLS_METHOD_RD
| CLS_METHOD_WR
,
1194 journal_client_update_data
,
1195 &h_journal_client_update_data
);
1196 cls_register_cxx_method(h_class
, "client_update_state",
1197 CLS_METHOD_RD
| CLS_METHOD_WR
,
1198 journal_client_update_state
,
1199 &h_journal_client_update_state
);
1200 cls_register_cxx_method(h_class
, "client_unregister",
1201 CLS_METHOD_RD
| CLS_METHOD_WR
,
1202 journal_client_unregister
,
1203 &h_journal_client_unregister
);
1204 cls_register_cxx_method(h_class
, "client_commit",
1205 CLS_METHOD_RD
| CLS_METHOD_WR
,
1206 journal_client_commit
, &h_journal_client_commit
);
1207 cls_register_cxx_method(h_class
, "client_list",
1209 journal_client_list
, &h_journal_client_list
);
1211 cls_register_cxx_method(h_class
, "get_next_tag_tid",
1213 journal_get_next_tag_tid
,
1214 &h_journal_get_next_tag_tid
);
1215 cls_register_cxx_method(h_class
, "get_tag",
1217 journal_get_tag
, &h_journal_get_tag
);
1218 cls_register_cxx_method(h_class
, "tag_create",
1219 CLS_METHOD_RD
| CLS_METHOD_WR
,
1220 journal_tag_create
, &h_journal_tag_create
);
1221 cls_register_cxx_method(h_class
, "tag_list",
1223 journal_tag_list
, &h_journal_tag_list
);
1225 /// methods for journal_data.$journal_id.$object_id objects
1226 cls_register_cxx_method(h_class
, "guard_append",
1227 CLS_METHOD_RD
| CLS_METHOD_WR
,
1228 journal_object_guard_append
,
1229 &h_journal_object_guard_append
);