1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/int_types.h"
5 #include "include/buffer.h"
6 #include "include/encoding.h"
7 #include "common/errno.h"
8 #include "objclass/objclass.h"
9 #include "cls/journal/cls_journal_types.h"
20 using ceph::bufferlist
;
26 static const uint64_t MAX_KEYS_READ
= 64;
28 static const std::string HEADER_KEY_ORDER
= "order";
29 static const std::string HEADER_KEY_SPLAY_WIDTH
= "splay_width";
30 static const std::string HEADER_KEY_POOL_ID
= "pool_id";
31 static const std::string HEADER_KEY_MINIMUM_SET
= "minimum_set";
32 static const std::string HEADER_KEY_ACTIVE_SET
= "active_set";
33 static const std::string HEADER_KEY_NEXT_TAG_TID
= "next_tag_tid";
34 static const std::string HEADER_KEY_NEXT_TAG_CLASS
= "next_tag_class";
35 static const std::string HEADER_KEY_CLIENT_PREFIX
= "client_";
36 static const std::string HEADER_KEY_TAG_PREFIX
= "tag_";
38 std::string
to_hex(uint64_t value
) {
39 std::ostringstream oss
;
40 oss
<< std::setw(16) << std::setfill('0') << std::hex
<< value
;
44 std::string
key_from_client_id(const std::string
&client_id
) {
45 return HEADER_KEY_CLIENT_PREFIX
+ client_id
;
48 std::string
key_from_tag_tid(uint64_t tag_tid
) {
49 return HEADER_KEY_TAG_PREFIX
+ to_hex(tag_tid
);
52 uint64_t tag_tid_from_key(const std::string
&key
) {
53 std::istringstream
iss(key
);
55 iss
.ignore(HEADER_KEY_TAG_PREFIX
.size()) >> std::hex
>> id
;
60 int read_key(cls_method_context_t hctx
, const string
&key
, T
*t
,
61 bool ignore_enoent
= false) {
63 int r
= cls_cxx_map_get_val(hctx
, key
, &bl
);
70 CLS_ERR("failed to get omap key: %s", key
.c_str());
75 auto iter
= bl
.cbegin();
77 } catch (const ceph::buffer::error
&err
) {
78 CLS_ERR("failed to decode input parameters: %s", err
.what());
85 int write_key(cls_method_context_t hctx
, const string
&key
, const T
&t
) {
89 int r
= cls_cxx_map_set_val(hctx
, key
, &bl
);
91 CLS_ERR("failed to set omap key: %s", key
.c_str());
97 int remove_key(cls_method_context_t hctx
, const string
&key
) {
98 int r
= cls_cxx_map_remove_key(hctx
, key
);
99 if (r
< 0 && r
!= -ENOENT
) {
100 CLS_ERR("failed to remove key: %s", key
.c_str());
106 int expire_tags(cls_method_context_t hctx
, const std::string
*skip_client_id
) {
108 std::string skip_client_key
;
109 if (skip_client_id
!= nullptr) {
110 skip_client_key
= key_from_client_id(*skip_client_id
);
113 uint64_t minimum_tag_tid
= std::numeric_limits
<uint64_t>::max();
114 std::string last_read
= "";
117 std::map
<std::string
, bufferlist
> vals
;
118 int r
= cls_cxx_map_get_vals(hctx
, last_read
, HEADER_KEY_CLIENT_PREFIX
,
119 MAX_KEYS_READ
, &vals
, &more
);
120 if (r
< 0 && r
!= -ENOENT
) {
121 CLS_ERR("failed to retrieve registered clients: %s",
122 cpp_strerror(r
).c_str());
126 for (auto &val
: vals
) {
127 // if we are removing a client, skip its commit positions
128 if (val
.first
== skip_client_key
) {
132 cls::journal::Client client
;
133 auto iter
= val
.second
.cbegin();
135 decode(client
, iter
);
136 } catch (const ceph::buffer::error
&err
) {
137 CLS_ERR("error decoding registered client: %s",
142 if (client
.state
== cls::journal::CLIENT_STATE_DISCONNECTED
) {
143 // don't allow a disconnected client to prevent pruning
145 } else if (client
.commit_position
.object_positions
.empty()) {
146 // cannot prune if one or more clients has an empty commit history
150 for (auto object_position
: client
.commit_position
.object_positions
) {
151 minimum_tag_tid
= std::min(minimum_tag_tid
, object_position
.tag_tid
);
155 last_read
= vals
.rbegin()->first
;
159 // cannot expire tags if a client hasn't committed yet
160 if (minimum_tag_tid
== std::numeric_limits
<uint64_t>::max()) {
164 // compute the minimum in-use tag for each class
165 std::map
<uint64_t, uint64_t> minimum_tag_class_to_tids
;
166 typedef enum { TAG_PASS_CALCULATE_MINIMUMS
,
168 TAG_PASS_DONE
} TagPass
;
169 int tag_pass
= TAG_PASS_CALCULATE_MINIMUMS
;
170 last_read
= HEADER_KEY_TAG_PREFIX
;
172 std::map
<std::string
, bufferlist
> vals
;
173 int r
= cls_cxx_map_get_vals(hctx
, last_read
, HEADER_KEY_TAG_PREFIX
,
174 MAX_KEYS_READ
, &vals
, &more
);
175 if (r
< 0 && r
!= -ENOENT
) {
176 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r
).c_str());
180 for (auto &val
: vals
) {
181 cls::journal::Tag tag
;
182 auto iter
= val
.second
.cbegin();
185 } catch (const ceph::buffer::error
&err
) {
186 CLS_ERR("error decoding tag: %s", val
.first
.c_str());
190 if (tag
.tid
!= tag_tid_from_key(val
.first
)) {
191 CLS_ERR("tag tid mismatched: %s", val
.first
.c_str());
195 if (tag_pass
== TAG_PASS_CALCULATE_MINIMUMS
) {
196 minimum_tag_class_to_tids
[tag
.tag_class
] = tag
.tid
;
197 } else if (tag_pass
== TAG_PASS_SCRUB
&&
198 tag
.tid
< minimum_tag_class_to_tids
[tag
.tag_class
]) {
199 r
= remove_key(hctx
, val
.first
);
205 if (tag
.tid
>= minimum_tag_tid
) {
206 // no need to check for tag classes beyond this point
213 if (tag_pass
!= TAG_PASS_DONE
&& !more
) {
214 last_read
= HEADER_KEY_TAG_PREFIX
;
216 } else if (!vals
.empty()) {
217 last_read
= vals
.rbegin()->first
;
219 } while (tag_pass
!= TAG_PASS_DONE
);
223 int get_client_list_range(cls_method_context_t hctx
,
224 std::set
<cls::journal::Client
> *clients
,
225 std::string start_after
, uint64_t max_return
) {
226 std::string last_read
;
227 if (!start_after
.empty()) {
228 last_read
= key_from_client_id(start_after
);
231 std::map
<std::string
, bufferlist
> vals
;
233 int r
= cls_cxx_map_get_vals(hctx
, last_read
, HEADER_KEY_CLIENT_PREFIX
,
234 max_return
, &vals
, &more
);
236 CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r
).c_str());
240 for (std::map
<std::string
, bufferlist
>::iterator it
= vals
.begin();
241 it
!= vals
.end(); ++it
) {
243 auto iter
= it
->second
.cbegin();
245 cls::journal::Client client
;
246 decode(client
, iter
);
247 clients
->insert(client
);
248 } catch (const ceph::buffer::error
&err
) {
249 CLS_ERR("could not decode client '%s': %s", it
->first
.c_str(),
258 int find_min_commit_position(cls_method_context_t hctx
,
259 cls::journal::ObjectSetPosition
*minset
) {
262 std::string start_after
= "";
263 uint64_t tag_tid
= 0, entry_tid
= 0;
266 std::set
<cls::journal::Client
> batch
;
268 r
= get_client_list_range(hctx
, &batch
, start_after
, cls::journal::JOURNAL_MAX_RETURN
);
269 if ((r
< 0) || batch
.empty()) {
273 start_after
= batch
.rbegin()->id
;
274 // update the (minimum) commit position from this batch of clients
275 for (const auto &client
: batch
) {
276 if (client
.state
== cls::journal::CLIENT_STATE_DISCONNECTED
) {
279 const auto &object_set_position
= client
.commit_position
;
280 if (object_set_position
.object_positions
.empty()) {
281 *minset
= cls::journal::ObjectSetPosition();
284 cls::journal::ObjectPosition first
= object_set_position
.object_positions
.front();
286 // least tag_tid (or least entry_tid for matching tag_tid)
287 if (!valid
|| (tag_tid
> first
.tag_tid
) || ((tag_tid
== first
.tag_tid
) && (entry_tid
> first
.entry_tid
))) {
288 tag_tid
= first
.tag_tid
;
289 entry_tid
= first
.entry_tid
;
290 *minset
= cls::journal::ObjectSetPosition(object_set_position
);
295 // got the last batch, we're done
296 if (batch
.size() < cls::journal::JOURNAL_MAX_RETURN
) {
304 } // anonymous namespace
308 * @param order (uint8_t) - bits to shift to compute the object max size
309 * @param splay width (uint8_t) - number of active journal objects
312 * @returns 0 on success, negative error code on failure
314 int journal_create(cls_method_context_t hctx
, bufferlist
*in
, bufferlist
*out
) {
319 auto iter
= in
->cbegin();
321 decode(splay_width
, iter
);
322 decode(pool_id
, iter
);
323 } catch (const ceph::buffer::error
&err
) {
324 CLS_ERR("failed to decode input parameters: %s", err
.what());
328 bufferlist stored_orderbl
;
329 int r
= cls_cxx_map_get_val(hctx
, HEADER_KEY_ORDER
, &stored_orderbl
);
331 CLS_ERR("journal already exists");
333 } else if (r
!= -ENOENT
) {
337 r
= write_key(hctx
, HEADER_KEY_ORDER
, order
);
342 r
= write_key(hctx
, HEADER_KEY_SPLAY_WIDTH
, splay_width
);
347 r
= write_key(hctx
, HEADER_KEY_POOL_ID
, pool_id
);
352 uint64_t object_set
= 0;
353 r
= write_key(hctx
, HEADER_KEY_ACTIVE_SET
, object_set
);
358 r
= write_key(hctx
, HEADER_KEY_MINIMUM_SET
, object_set
);
364 r
= write_key(hctx
, HEADER_KEY_NEXT_TAG_TID
, tag_id
);
369 r
= write_key(hctx
, HEADER_KEY_NEXT_TAG_CLASS
, tag_id
);
382 * @returns 0 on success, negative error code on failure
384 int journal_get_order(cls_method_context_t hctx
, bufferlist
*in
,
387 int r
= read_key(hctx
, HEADER_KEY_ORDER
, &order
);
401 * splay_width (uint8_t)
402 * @returns 0 on success, negative error code on failure
404 int journal_get_splay_width(cls_method_context_t hctx
, bufferlist
*in
,
407 int r
= read_key(hctx
, HEADER_KEY_SPLAY_WIDTH
, &splay_width
);
412 encode(splay_width
, *out
);
422 * @returns 0 on success, negative error code on failure
424 int journal_get_pool_id(cls_method_context_t hctx
, bufferlist
*in
,
427 int r
= read_key(hctx
, HEADER_KEY_POOL_ID
, &pool_id
);
432 encode(pool_id
, *out
);
441 * object set (uint64_t)
442 * @returns 0 on success, negative error code on failure
444 int journal_get_minimum_set(cls_method_context_t hctx
, bufferlist
*in
,
446 uint64_t minimum_set
;
447 int r
= read_key(hctx
, HEADER_KEY_MINIMUM_SET
, &minimum_set
);
452 encode(minimum_set
, *out
);
458 * @param object set (uint64_t)
461 * @returns 0 on success, negative error code on failure
463 int journal_set_minimum_set(cls_method_context_t hctx
, bufferlist
*in
,
467 auto iter
= in
->cbegin();
468 decode(object_set
, iter
);
469 } catch (const ceph::buffer::error
&err
) {
470 CLS_ERR("failed to decode input parameters: %s", err
.what());
474 uint64_t current_active_set
;
475 int r
= read_key(hctx
, HEADER_KEY_ACTIVE_SET
, ¤t_active_set
);
480 if (current_active_set
< object_set
) {
481 CLS_LOG(10, "active object set earlier than minimum: %" PRIu64
482 " < %" PRIu64
, current_active_set
, object_set
);
486 uint64_t current_minimum_set
;
487 r
= read_key(hctx
, HEADER_KEY_MINIMUM_SET
, ¤t_minimum_set
);
492 if (object_set
== current_minimum_set
) {
494 } else if (object_set
< current_minimum_set
) {
495 CLS_ERR("object number earlier than current object: %" PRIu64
" < %" PRIu64
,
496 object_set
, current_minimum_set
);
500 r
= write_key(hctx
, HEADER_KEY_MINIMUM_SET
, object_set
);
512 * object set (uint64_t)
513 * @returns 0 on success, negative error code on failure
515 int journal_get_active_set(cls_method_context_t hctx
, bufferlist
*in
,
518 int r
= read_key(hctx
, HEADER_KEY_ACTIVE_SET
, &active_set
);
523 encode(active_set
, *out
);
529 * @param object set (uint64_t)
532 * @returns 0 on success, negative error code on failure
534 int journal_set_active_set(cls_method_context_t hctx
, bufferlist
*in
,
538 auto iter
= in
->cbegin();
539 decode(object_set
, iter
);
540 } catch (const ceph::buffer::error
&err
) {
541 CLS_ERR("failed to decode input parameters: %s", err
.what());
545 uint64_t current_minimum_set
;
546 int r
= read_key(hctx
, HEADER_KEY_MINIMUM_SET
, ¤t_minimum_set
);
551 if (current_minimum_set
> object_set
) {
552 CLS_ERR("minimum object set later than active: %" PRIu64
553 " > %" PRIu64
, current_minimum_set
, object_set
);
557 uint64_t current_active_set
;
558 r
= read_key(hctx
, HEADER_KEY_ACTIVE_SET
, ¤t_active_set
);
563 if (object_set
== current_active_set
) {
565 } else if (object_set
< current_active_set
) {
566 CLS_ERR("object number earlier than current object: %" PRIu64
" < %" PRIu64
,
567 object_set
, current_active_set
);
571 r
= write_key(hctx
, HEADER_KEY_ACTIVE_SET
, object_set
);
580 * @param id (string) - unique client id
583 * cls::journal::Client
584 * @returns 0 on success, negative error code on failure
586 int journal_get_client(cls_method_context_t hctx
, bufferlist
*in
,
590 auto iter
= in
->cbegin();
592 } catch (const ceph::buffer::error
&err
) {
593 CLS_ERR("failed to decode input parameters: %s", err
.what());
597 std::string
key(key_from_client_id(id
));
598 cls::journal::Client client
;
599 int r
= read_key(hctx
, key
, &client
);
604 encode(client
, *out
);
610 * @param id (string) - unique client id
611 * @param data (bufferlist) - opaque data associated to client
614 * @returns 0 on success, negative error code on failure
616 int journal_client_register(cls_method_context_t hctx
, bufferlist
*in
,
621 auto iter
= in
->cbegin();
624 } catch (const ceph::buffer::error
&err
) {
625 CLS_ERR("failed to decode input parameters: %s", err
.what());
630 int r
= read_key(hctx
, HEADER_KEY_ORDER
, &order
);
635 std::string
key(key_from_client_id(id
));
636 bufferlist stored_clientbl
;
637 r
= cls_cxx_map_get_val(hctx
, key
, &stored_clientbl
);
639 CLS_ERR("duplicate client id: %s", id
.c_str());
641 } else if (r
!= -ENOENT
) {
645 cls::journal::ObjectSetPosition minset
;
646 r
= find_min_commit_position(hctx
, &minset
);
650 cls::journal::Client
client(id
, data
, minset
);
651 r
= write_key(hctx
, key
, client
);
660 * @param id (string) - unique client id
661 * @param data (bufferlist) - opaque data associated to client
664 * @returns 0 on success, negative error code on failure
666 int journal_client_update_data(cls_method_context_t hctx
, bufferlist
*in
,
671 auto iter
= in
->cbegin();
674 } catch (const ceph::buffer::error
&err
) {
675 CLS_ERR("failed to decode input parameters: %s", err
.what());
679 std::string
key(key_from_client_id(id
));
680 cls::journal::Client client
;
681 int r
= read_key(hctx
, key
, &client
);
687 r
= write_key(hctx
, key
, client
);
696 * @param id (string) - unique client id
697 * @param state (uint8_t) - client state
700 * @returns 0 on success, negative error code on failure
702 int journal_client_update_state(cls_method_context_t hctx
, bufferlist
*in
,
705 cls::journal::ClientState state
;
708 auto iter
= in
->cbegin();
711 decode(state_raw
, iter
);
712 state
= static_cast<cls::journal::ClientState
>(state_raw
);
713 } catch (const ceph::buffer::error
&err
) {
714 CLS_ERR("failed to decode input parameters: %s", err
.what());
718 std::string
key(key_from_client_id(id
));
719 cls::journal::Client client
;
720 int r
= read_key(hctx
, key
, &client
);
725 client
.state
= state
;
726 r
= write_key(hctx
, key
, client
);
735 * @param id (string) - unique client id
738 * @returns 0 on success, negative error code on failure
740 int journal_client_unregister(cls_method_context_t hctx
, bufferlist
*in
,
744 auto iter
= in
->cbegin();
746 } catch (const ceph::buffer::error
&err
) {
747 CLS_ERR("failed to decode input parameters: %s", err
.what());
751 std::string
key(key_from_client_id(id
));
753 int r
= cls_cxx_map_get_val(hctx
, key
, &bl
);
755 CLS_ERR("client is not registered: %s", id
.c_str());
759 r
= cls_cxx_map_remove_key(hctx
, key
);
761 CLS_ERR("failed to remove omap key: %s", key
.c_str());
765 // prune expired tags
766 r
= expire_tags(hctx
, &id
);
775 * @param client_id (uint64_t) - unique client id
776 * @param commit_position (ObjectSetPosition)
779 * @returns 0 on success, negative error code on failure
781 int journal_client_commit(cls_method_context_t hctx
, bufferlist
*in
,
784 cls::journal::ObjectSetPosition commit_position
;
786 auto iter
= in
->cbegin();
788 decode(commit_position
, iter
);
789 } catch (const ceph::buffer::error
&err
) {
790 CLS_ERR("failed to decode input parameters: %s", err
.what());
795 int r
= read_key(hctx
, HEADER_KEY_SPLAY_WIDTH
, &splay_width
);
799 if (commit_position
.object_positions
.size() > splay_width
) {
800 CLS_ERR("too many object positions");
804 std::string
key(key_from_client_id(id
));
805 cls::journal::Client client
;
806 r
= read_key(hctx
, key
, &client
);
811 if (client
.commit_position
== commit_position
) {
815 client
.commit_position
= commit_position
;
816 r
= write_key(hctx
, key
, client
);
825 * @param start_after (string)
826 * @param max_return (uint64_t)
829 * clients (set<cls::journal::Client>) - collection of registered clients
830 * @returns 0 on success, negative error code on failure
832 int journal_client_list(cls_method_context_t hctx
, bufferlist
*in
,
834 std::string start_after
;
837 auto iter
= in
->cbegin();
838 decode(start_after
, iter
);
839 decode(max_return
, iter
);
840 } catch (const ceph::buffer::error
&err
) {
841 CLS_ERR("failed to decode input parameters: %s", err
.what());
845 std::set
<cls::journal::Client
> clients
;
846 int r
= get_client_list_range(hctx
, &clients
, start_after
, max_return
);
850 encode(clients
, *out
);
859 * @returns 0 on success, negative error code on failure
861 int journal_get_next_tag_tid(cls_method_context_t hctx
, bufferlist
*in
,
864 int r
= read_key(hctx
, HEADER_KEY_NEXT_TAG_TID
, &tag_tid
);
869 encode(tag_tid
, *out
);
875 * @param tag_tid (uint64_t)
879 * @returns 0 on success, negative error code on failure
881 int journal_get_tag(cls_method_context_t hctx
, bufferlist
*in
,
885 auto iter
= in
->cbegin();
886 decode(tag_tid
, iter
);
887 } catch (const ceph::buffer::error
&err
) {
888 CLS_ERR("failed to decode input parameters: %s", err
.what());
892 std::string
key(key_from_tag_tid(tag_tid
));
893 cls::journal::Tag tag
;
894 int r
= read_key(hctx
, key
, &tag
);
905 * @param tag_tid (uint64_t)
906 * @param tag_class (uint64_t)
907 * @param data (bufferlist)
910 * @returns 0 on success, negative error code on failure
912 int journal_tag_create(cls_method_context_t hctx
, bufferlist
*in
,
918 auto iter
= in
->cbegin();
919 decode(tag_tid
, iter
);
920 decode(tag_class
, iter
);
922 } catch (const ceph::buffer::error
&err
) {
923 CLS_ERR("failed to decode input parameters: %s", err
.what());
927 std::string
key(key_from_tag_tid(tag_tid
));
928 bufferlist stored_tag_bl
;
929 int r
= cls_cxx_map_get_val(hctx
, key
, &stored_tag_bl
);
931 CLS_ERR("duplicate tag id: %" PRIu64
, tag_tid
);
933 } else if (r
!= -ENOENT
) {
937 // verify tag tid ordering
938 uint64_t next_tag_tid
;
939 r
= read_key(hctx
, HEADER_KEY_NEXT_TAG_TID
, &next_tag_tid
);
943 if (tag_tid
!= next_tag_tid
) {
944 CLS_LOG(5, "out-of-order tag sequence: %" PRIu64
, tag_tid
);
948 uint64_t next_tag_class
;
949 r
= read_key(hctx
, HEADER_KEY_NEXT_TAG_CLASS
, &next_tag_class
);
954 if (tag_class
== cls::journal::Tag::TAG_CLASS_NEW
) {
955 // allocate a new tag class
956 tag_class
= next_tag_class
;
957 r
= write_key(hctx
, HEADER_KEY_NEXT_TAG_CLASS
, tag_class
+ 1);
962 // verify tag class range
963 if (tag_class
>= next_tag_class
) {
964 CLS_ERR("out-of-sequence tag class: %" PRIu64
, tag_class
);
969 // prune expired tags
970 r
= expire_tags(hctx
, nullptr);
975 // update tag tid sequence
976 r
= write_key(hctx
, HEADER_KEY_NEXT_TAG_TID
, tag_tid
+ 1);
981 // write tag structure
982 cls::journal::Tag
tag(tag_tid
, tag_class
, data
);
983 key
= key_from_tag_tid(tag_tid
);
984 r
= write_key(hctx
, key
, tag
);
993 * @param start_after_tag_tid (uint64_t) - first tag tid
994 * @param max_return (uint64_t) - max tags to return
995 * @param client_id (std::string) - client id filter
996 * @param tag_class (boost::optional<uint64_t> - optional tag class filter
999 * std::set<cls::journal::Tag> - collection of tags
1000 * @returns 0 on success, negative error code on failure
1002 int journal_tag_list(cls_method_context_t hctx
, bufferlist
*in
,
1004 uint64_t start_after_tag_tid
;
1005 uint64_t max_return
;
1006 std::string client_id
;
1007 boost::optional
<uint64_t> tag_class(0);
1009 // handle compiler false positive about use-before-init
1010 tag_class
= boost::none
;
1012 auto iter
= in
->cbegin();
1013 decode(start_after_tag_tid
, iter
);
1014 decode(max_return
, iter
);
1015 decode(client_id
, iter
);
1016 decode(tag_class
, iter
);
1017 } catch (const ceph::buffer::error
&err
) {
1018 CLS_ERR("failed to decode input parameters: %s", err
.what());
1022 // calculate the minimum tag within client's commit position
1023 uint64_t minimum_tag_tid
= std::numeric_limits
<uint64_t>::max();
1024 cls::journal::Client client
;
1025 int r
= read_key(hctx
, key_from_client_id(client_id
), &client
);
1030 for (auto object_position
: client
.commit_position
.object_positions
) {
1031 minimum_tag_tid
= std::min(minimum_tag_tid
, object_position
.tag_tid
);
1034 // compute minimum tags in use per-class
1035 std::set
<cls::journal::Tag
> tags
;
1036 std::map
<uint64_t, uint64_t> minimum_tag_class_to_tids
;
1037 typedef enum { TAG_PASS_CALCULATE_MINIMUMS
,
1039 TAG_PASS_DONE
} TagPass
;
1040 int tag_pass
= (minimum_tag_tid
== std::numeric_limits
<uint64_t>::max() ?
1041 TAG_PASS_LIST
: TAG_PASS_CALCULATE_MINIMUMS
);
1042 std::string last_read
= HEADER_KEY_TAG_PREFIX
;
1044 std::map
<std::string
, bufferlist
> vals
;
1046 r
= cls_cxx_map_get_vals(hctx
, last_read
, HEADER_KEY_TAG_PREFIX
,
1047 MAX_KEYS_READ
, &vals
, &more
);
1048 if (r
< 0 && r
!= -ENOENT
) {
1049 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r
).c_str());
1053 for (auto &val
: vals
) {
1054 cls::journal::Tag tag
;
1055 auto iter
= val
.second
.cbegin();
1058 } catch (const ceph::buffer::error
&err
) {
1059 CLS_ERR("error decoding tag: %s", val
.first
.c_str());
1063 if (tag_pass
== TAG_PASS_CALCULATE_MINIMUMS
) {
1064 minimum_tag_class_to_tids
[tag
.tag_class
] = tag
.tid
;
1066 // completed calculation of tag class minimums
1067 if (tag
.tid
>= minimum_tag_tid
) {
1072 } else if (tag_pass
== TAG_PASS_LIST
) {
1073 if (start_after_tag_tid
!= 0 && tag
.tid
<= start_after_tag_tid
) {
1077 if (tag
.tid
>= minimum_tag_class_to_tids
[tag
.tag_class
] &&
1078 (!tag_class
|| *tag_class
== tag
.tag_class
)) {
1081 if (tags
.size() >= max_return
) {
1082 tag_pass
= TAG_PASS_DONE
;
1087 if (tag_pass
!= TAG_PASS_DONE
&& !more
) {
1088 last_read
= HEADER_KEY_TAG_PREFIX
;
1090 } else if (!vals
.empty()) {
1091 last_read
= vals
.rbegin()->first
;
1093 } while (tag_pass
!= TAG_PASS_DONE
);
1101 * @param soft_max_size (uint64_t)
1104 * @returns 0 if object size less than max, negative error code otherwise
1106 int journal_object_guard_append(cls_method_context_t hctx
, bufferlist
*in
,
1108 uint64_t soft_max_size
;
1110 auto iter
= in
->cbegin();
1111 decode(soft_max_size
, iter
);
1112 } catch (const ceph::buffer::error
&err
) {
1113 CLS_ERR("failed to decode input parameters: %s", err
.what());
1119 int r
= cls_cxx_stat(hctx
, &size
, &mtime
);
1123 CLS_ERR("failed to stat object: %s", cpp_strerror(r
).c_str());
1127 if (size
>= soft_max_size
) {
1128 CLS_LOG(5, "journal object full: %" PRIu64
" >= %" PRIu64
,
1129 size
, soft_max_size
);
1137 * @param soft_max_size (uint64_t)
1138 * @param data (bufferlist) data to append
1141 * @returns 0 on success, negative error code on failure
1142 * @returns -EOVERFLOW if object size is equal or more than soft_max_size
1144 int journal_object_append(cls_method_context_t hctx
, bufferlist
*in
,
1146 uint64_t soft_max_size
;
1149 auto iter
= in
->cbegin();
1150 decode(soft_max_size
, iter
);
1152 } catch (const ceph::buffer::error
&err
) {
1153 CLS_ERR("failed to decode input parameters: %s", err
.what());
1158 int r
= cls_cxx_stat(hctx
, &size
, nullptr);
1159 if (r
< 0 && r
!= -ENOENT
) {
1160 CLS_ERR("append: failed to stat object: %s", cpp_strerror(r
).c_str());
1164 if (size
>= soft_max_size
) {
1165 CLS_LOG(5, "journal object full: %" PRIu64
" >= %" PRIu64
,
1166 size
, soft_max_size
);
1171 r
= cls_cxx_write2(hctx
, offset
, data
.length(), &data
,
1172 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
1174 CLS_ERR("append: error when writing: %s", cpp_strerror(r
).c_str());
1178 if (cls_get_min_compatible_client(hctx
) < ceph_release_t::octopus
) {
1182 auto min_alloc_size
= cls_get_osd_min_alloc_size(hctx
);
1183 if (min_alloc_size
== 0) {
1187 auto stripe_width
= cls_get_pool_stripe_width(hctx
);
1188 if (stripe_width
> 0) {
1189 min_alloc_size
= round_up_to(min_alloc_size
, stripe_width
);
1192 CLS_LOG(20, "pad to %" PRIu64
, min_alloc_size
);
1194 auto end
= offset
+ data
.length();
1195 auto new_end
= round_up_to(end
, min_alloc_size
);
1196 if (new_end
== end
) {
1200 r
= cls_cxx_truncate(hctx
, new_end
);
1202 CLS_ERR("append: error when truncating: %s", cpp_strerror(r
).c_str());
1211 CLS_LOG(20, "Loaded journal class!");
1213 cls_handle_t h_class
;
1214 cls_method_handle_t h_journal_create
;
1215 cls_method_handle_t h_journal_get_order
;
1216 cls_method_handle_t h_journal_get_splay_width
;
1217 cls_method_handle_t h_journal_get_pool_id
;
1218 cls_method_handle_t h_journal_get_minimum_set
;
1219 cls_method_handle_t h_journal_set_minimum_set
;
1220 cls_method_handle_t h_journal_get_active_set
;
1221 cls_method_handle_t h_journal_set_active_set
;
1222 cls_method_handle_t h_journal_get_client
;
1223 cls_method_handle_t h_journal_client_register
;
1224 cls_method_handle_t h_journal_client_update_data
;
1225 cls_method_handle_t h_journal_client_update_state
;
1226 cls_method_handle_t h_journal_client_unregister
;
1227 cls_method_handle_t h_journal_client_commit
;
1228 cls_method_handle_t h_journal_client_list
;
1229 cls_method_handle_t h_journal_get_next_tag_tid
;
1230 cls_method_handle_t h_journal_get_tag
;
1231 cls_method_handle_t h_journal_tag_create
;
1232 cls_method_handle_t h_journal_tag_list
;
1233 cls_method_handle_t h_journal_object_guard_append
;
1234 cls_method_handle_t h_journal_object_append
;
1236 cls_register("journal", &h_class
);
1238 /// methods for journal.$journal_id objects
1239 cls_register_cxx_method(h_class
, "create",
1240 CLS_METHOD_RD
| CLS_METHOD_WR
,
1241 journal_create
, &h_journal_create
);
1242 cls_register_cxx_method(h_class
, "get_order",
1244 journal_get_order
, &h_journal_get_order
);
1245 cls_register_cxx_method(h_class
, "get_splay_width",
1247 journal_get_splay_width
, &h_journal_get_splay_width
);
1248 cls_register_cxx_method(h_class
, "get_pool_id",
1250 journal_get_pool_id
, &h_journal_get_pool_id
);
1251 cls_register_cxx_method(h_class
, "get_minimum_set",
1253 journal_get_minimum_set
,
1254 &h_journal_get_minimum_set
);
1255 cls_register_cxx_method(h_class
, "set_minimum_set",
1256 CLS_METHOD_RD
| CLS_METHOD_WR
,
1257 journal_set_minimum_set
,
1258 &h_journal_set_minimum_set
);
1259 cls_register_cxx_method(h_class
, "get_active_set",
1261 journal_get_active_set
,
1262 &h_journal_get_active_set
);
1263 cls_register_cxx_method(h_class
, "set_active_set",
1264 CLS_METHOD_RD
| CLS_METHOD_WR
,
1265 journal_set_active_set
,
1266 &h_journal_set_active_set
);
1268 cls_register_cxx_method(h_class
, "get_client",
1270 journal_get_client
, &h_journal_get_client
);
1271 cls_register_cxx_method(h_class
, "client_register",
1272 CLS_METHOD_RD
| CLS_METHOD_WR
,
1273 journal_client_register
, &h_journal_client_register
);
1274 cls_register_cxx_method(h_class
, "client_update_data",
1275 CLS_METHOD_RD
| CLS_METHOD_WR
,
1276 journal_client_update_data
,
1277 &h_journal_client_update_data
);
1278 cls_register_cxx_method(h_class
, "client_update_state",
1279 CLS_METHOD_RD
| CLS_METHOD_WR
,
1280 journal_client_update_state
,
1281 &h_journal_client_update_state
);
1282 cls_register_cxx_method(h_class
, "client_unregister",
1283 CLS_METHOD_RD
| CLS_METHOD_WR
,
1284 journal_client_unregister
,
1285 &h_journal_client_unregister
);
1286 cls_register_cxx_method(h_class
, "client_commit",
1287 CLS_METHOD_RD
| CLS_METHOD_WR
,
1288 journal_client_commit
, &h_journal_client_commit
);
1289 cls_register_cxx_method(h_class
, "client_list",
1291 journal_client_list
, &h_journal_client_list
);
1293 cls_register_cxx_method(h_class
, "get_next_tag_tid",
1295 journal_get_next_tag_tid
,
1296 &h_journal_get_next_tag_tid
);
1297 cls_register_cxx_method(h_class
, "get_tag",
1299 journal_get_tag
, &h_journal_get_tag
);
1300 cls_register_cxx_method(h_class
, "tag_create",
1301 CLS_METHOD_RD
| CLS_METHOD_WR
,
1302 journal_tag_create
, &h_journal_tag_create
);
1303 cls_register_cxx_method(h_class
, "tag_list",
1305 journal_tag_list
, &h_journal_tag_list
);
1307 /// methods for journal_data.$journal_id.$object_id objects
1308 cls_register_cxx_method(h_class
, "guard_append",
1309 CLS_METHOD_RD
| CLS_METHOD_WR
,
1310 journal_object_guard_append
,
1311 &h_journal_object_guard_append
);
1312 cls_register_cxx_method(h_class
, "append", CLS_METHOD_RD
| CLS_METHOD_WR
,
1313 journal_object_append
, &h_journal_object_append
);