2 * OSD classes for the key value store
4 * Created on: Aug 10, 2012
5 * Author: Eleanor Cawthon
8 #include "include/compat.h"
9 #include "objclass/objclass.h"
11 #include "key_value_store/kvs_arg_types.h"
12 #include "include/types.h"
21 * finds the index_data where a key belongs.
23 * @param key: the key to search for
24 * @param idata: the index_data for the first index value such that idata.key
25 * is greater than key.
26 * @param next_idata: the index_data for the next index entry after idata
27 * @pre: key is not encoded
28 * @post: idata contains complete information
31 static int get_idata_from_key(cls_method_context_t hctx
, const string
&key
,
32 index_data
&idata
, index_data
&next_idata
) {
35 std::map
<std::string
, bufferlist
> kvmap
;
39 r
= cls_cxx_map_get_vals(hctx
, key_data(key
).encoded(), "", 2, &kvmap
, &more
);
41 CLS_LOG(20, "error reading index for range %s: %d", key
.c_str(), r
);
45 r
= cls_cxx_map_get_val(hctx
, key_data(key
).encoded(), &raw_val
);
47 CLS_LOG(20, "%s is already in the index: %d", key
.c_str(), r
);
48 auto b
= raw_val
.cbegin();
51 auto b
= kvmap
.begin()->second
.cbegin();
55 } else if (r
== -ENOENT
|| r
== -ENODATA
) {
56 auto b
= kvmap
.begin()->second
.cbegin();
58 if (idata
.kdata
.prefix
!= "1") {
59 auto nb
= (++kvmap
.begin())->second
.cbegin();
60 next_idata
.decode(nb
);
64 CLS_LOG(20, "error reading index for duplicates %s: %d", key
.c_str(), r
);
68 CLS_LOG(20, "idata is %s", idata
.str().c_str());
73 static int get_idata_from_key_op(cls_method_context_t hctx
,
74 bufferlist
*in
, bufferlist
*out
) {
75 CLS_LOG(20, "get_idata_from_key_op");
76 idata_from_key_args op
;
77 auto it
= in
->cbegin();
80 } catch (buffer::error
& err
) {
81 CLS_LOG(20, "error decoding idata_from_key_args.");
84 int r
= get_idata_from_key(hctx
, op
.key
, op
.idata
, op
.next_idata
);
94 * finds the object in the index with the lowest key value that is greater
95 * than idata.key. If idata.key is the max key, returns -EOVERFLOW. If
96 * idata has a prefix and has timed out, cleans up.
98 * @param idata: idata for the object to search for.
99 * @param out_data: the idata for the next object.
101 * @pre: idata must contain a key.
102 * @post: out_data contains complete information
104 static int get_next_idata(cls_method_context_t hctx
, const index_data
&idata
,
105 index_data
&out_data
) {
107 std::map
<std::string
, bufferlist
> kvs
;
109 r
= cls_cxx_map_get_vals(hctx
, idata
.kdata
.encoded(), "", 1, &kvs
, &more
);
111 CLS_LOG(20, "getting kvs failed with error %d", r
);
116 out_data
.kdata
.parse(kvs
.begin()->first
);
117 auto b
= kvs
.begin()->second
.cbegin();
126 static int get_next_idata_op(cls_method_context_t hctx
,
127 bufferlist
*in
, bufferlist
*out
) {
128 CLS_LOG(20, "get_next_idata_op");
129 idata_from_idata_args op
;
130 auto it
= in
->cbegin();
133 } catch (buffer::error
& err
) {
136 int r
= get_next_idata(hctx
, op
.idata
, op
.next_idata
);
146 * finds the object in the index with the highest key value that is less
147 * than idata.key. If idata.key is the lowest key, returns -ERANGE If
148 * idata has a prefix and has timed out, cleans up.
150 * @param idata: idata for the object to search for.
151 * @param out_data: the idata for the next object.
153 * @pre: idata must contain a key.
154 * @ost: out_data contains complete information
156 static int get_prev_idata(cls_method_context_t hctx
, const index_data
&idata
,
157 index_data
&out_data
) {
159 std::map
<std::string
, bufferlist
> kvs
;
161 r
= cls_cxx_map_get_vals(hctx
, "", "", LONG_MAX
, &kvs
, &more
);
163 CLS_LOG(20, "getting kvs failed with error %d", r
);
167 std::map
<std::string
, bufferlist
>::iterator it
=
168 kvs
.lower_bound(idata
.kdata
.encoded());
169 if (it
->first
!= idata
.kdata
.encoded()) {
170 CLS_LOG(20, "object %s not found in the index (expected %s, found %s)",
171 idata
.str().c_str(), idata
.kdata
.encoded().c_str(),
175 if (it
== kvs
.begin()) {
176 //it is the first object, there is no previous.
181 out_data
.kdata
.parse(it
->first
);
182 auto b
= it
->second
.cbegin();
188 static int get_prev_idata_op(cls_method_context_t hctx
,
189 bufferlist
*in
, bufferlist
*out
) {
190 CLS_LOG(20, "get_next_idata_op");
191 idata_from_idata_args op
;
192 auto it
= in
->cbegin();
195 } catch (buffer::error
& err
) {
198 int r
= get_prev_idata(hctx
, op
.idata
, op
.next_idata
);
208 * Read all of the index entries where any keys in the map go
210 static int read_many(cls_method_context_t hctx
, const set
<string
> &keys
,
211 map
<string
, bufferlist
> * out
) {
214 CLS_ERR("reading from a map of size %d, first key encoded is %s",
215 (int)keys
.size(), key_data(*keys
.begin()).encoded().c_str());
216 r
= cls_cxx_map_get_vals(hctx
, key_data(*keys
.begin()).encoded().c_str(),
217 "", LONG_MAX
, out
, &more
);
219 CLS_ERR("getting omap vals failed with error %d", r
);
222 CLS_ERR("got map of size %d ", (int)out
->size());
223 if (out
->size() > 1) {
224 out
->erase(out
->upper_bound(key_data(*keys
.rbegin()).encoded().c_str()),
227 CLS_ERR("returning map of size %d", (int)out
->size());
231 static int read_many_op(cls_method_context_t hctx
, bufferlist
*in
,
233 CLS_LOG(20, "read_many_op");
235 map
<string
, bufferlist
> outmap
;
236 auto it
= in
->cbegin();
239 } catch (buffer::error
& err
) {
242 int r
= read_many(hctx
, op
, &outmap
);
246 encode(outmap
, *out
);
252 * Checks the unwritable xattr. If it is "1" (i.e., it is unwritable), returns
253 * -EACCES. otherwise, returns 0.
255 static int check_writable(cls_method_context_t hctx
) {
257 int r
= cls_cxx_getxattr(hctx
, "unwritable", &bl
);
259 CLS_LOG(20, "error reading xattr %s: %d", "unwritable", r
);
262 if (string(bl
.c_str(), bl
.length()) == "1") {
269 static int check_writable_op(cls_method_context_t hctx
,
270 bufferlist
*in
, bufferlist
*out
) {
271 CLS_LOG(20, "check_writable_op");
272 return check_writable(hctx
);
276 * returns -EKEYREJECTED if size is outside of bound, according to comparator.
278 * @bound: the limit to test
279 * @comparator: should be CEPH_OSD_CMPXATTR_OP_[EQ|GT|LT]
281 static int assert_size_in_bound(cls_method_context_t hctx
, int bound
,
285 int r
= cls_cxx_getxattr(hctx
, "size", &size_bl
);
287 CLS_LOG(20, "error reading xattr %s: %d", "size", r
);
291 int size
= atoi(string(size_bl
.c_str(), size_bl
.length()).c_str());
292 CLS_LOG(20, "size is %d, bound is %d", size
, bound
);
294 //compare size to comparator
295 switch (comparator
) {
296 case CEPH_OSD_CMPXATTR_OP_EQ
:
298 return -EKEYREJECTED
;
301 case CEPH_OSD_CMPXATTR_OP_LT
:
303 return -EKEYREJECTED
;
306 case CEPH_OSD_CMPXATTR_OP_GT
:
308 return -EKEYREJECTED
;
312 CLS_LOG(20, "invalid argument passed to assert_size_in_bound: %d",
319 static int assert_size_in_bound_op(cls_method_context_t hctx
,
320 bufferlist
*in
, bufferlist
*out
) {
321 CLS_LOG(20, "assert_size_in_bound_op");
323 auto it
= in
->cbegin();
326 } catch (buffer::error
& err
) {
329 return assert_size_in_bound(hctx
, op
.bound
, op
.comparator
);
333 * Attempts to insert omap into this object's omap.
336 * if unwritable, returns -EACCES.
337 * if size > bound and key doesn't already exist in the omap, returns -EBALANCE.
338 * if exclusive is true, returns -EEXIST if any keys already exist.
340 * @post: object has omap entries inserted, and size xattr is updated
342 static int omap_insert(cls_method_context_t hctx
,
343 const map
<string
, bufferlist
> &omap
, int bound
, bool exclusive
) {
347 int r
= cls_cxx_stat(hctx
, &size
, &time
);
351 CLS_LOG(20, "inserting %s", omap
.begin()->first
.c_str());
352 r
= check_writable(hctx
);
354 CLS_LOG(20, "omap_insert: this object is unwritable: %d", r
);
358 int assert_bound
= bound
;
360 //if this is an exclusive insert, make sure the key doesn't already exist.
361 for (map
<string
, bufferlist
>::const_iterator it
= omap
.begin();
362 it
!= omap
.end(); ++it
) {
364 r
= cls_cxx_map_get_val(hctx
, it
->first
, &bl
);
365 if (r
== 0 && string(bl
.c_str(), bl
.length()) != ""){
367 CLS_LOG(20, "error: this is an exclusive insert and %s exists.",
372 CLS_LOG(20, "increased assert_bound to %d", assert_bound
);
373 } else if (r
!= -ENODATA
&& r
!= -ENOENT
) {
374 CLS_LOG(20, "error reading omap val for %s: %d", it
->first
.c_str(), r
);
380 r
= cls_cxx_getxattr(hctx
, "size", &old_size
);
382 CLS_LOG(20, "error reading xattr %s: %d", "size", r
);
386 int old_size_int
= atoi(string(old_size
.c_str(), old_size
.length()).c_str());
388 CLS_LOG(20, "asserting size is less than %d (bound is %d)", assert_bound
, bound
);
389 if (old_size_int
>= assert_bound
) {
390 return -EKEYREJECTED
;
393 int new_size_int
= old_size_int
+ omap
.size() - (assert_bound
- bound
);
394 CLS_LOG(20, "old size is %d, new size is %d", old_size_int
, new_size_int
);
398 new_size
.append(s
.str());
400 r
= cls_cxx_map_set_vals(hctx
, &omap
);
402 CLS_LOG(20, "error setting omap: %d", r
);
406 r
= cls_cxx_setxattr(hctx
, "size", &new_size
);
408 CLS_LOG(20, "error setting xattr %s: %d", "size", r
);
411 CLS_LOG(20, "successfully inserted %s", omap
.begin()->first
.c_str());
415 static int omap_insert_op(cls_method_context_t hctx
,
416 bufferlist
*in
, bufferlist
*out
) {
417 CLS_LOG(20, "omap_insert");
419 auto it
= in
->cbegin();
422 } catch (buffer::error
& err
) {
425 return omap_insert(hctx
, op
.omap
, op
.bound
, op
.exclusive
);
428 static int create_with_omap(cls_method_context_t hctx
,
429 const map
<string
, bufferlist
> &omap
) {
430 CLS_LOG(20, "creating with omap: %s", omap
.begin()->first
.c_str());
431 //first make sure the object is writable
432 int r
= cls_cxx_create(hctx
, true);
434 CLS_LOG(20, "omap create: creating failed: %d", r
);
438 int new_size_int
= omap
.size();
439 CLS_LOG(20, "omap insert: new size is %d", new_size_int
);
443 new_size
.append(s
.str());
445 r
= cls_cxx_map_set_vals(hctx
, &omap
);
447 CLS_LOG(20, "omap create: error setting omap: %d", r
);
451 r
= cls_cxx_setxattr(hctx
, "size", &new_size
);
453 CLS_LOG(20, "omap create: error setting xattr %s: %d", "size", r
);
459 r
= cls_cxx_setxattr(hctx
, "unwritable", &u
);
461 CLS_LOG(20, "omap create: error setting xattr %s: %d", "unwritable", r
);
465 CLS_LOG(20, "successfully created %s", omap
.begin()->first
.c_str());
469 static int create_with_omap_op(cls_method_context_t hctx
,
470 bufferlist
*in
, bufferlist
*out
) {
471 CLS_LOG(20, "omap_insert");
472 map
<string
, bufferlist
> omap
;
473 auto it
= in
->cbegin();
476 } catch (buffer::error
& err
) {
479 return create_with_omap(hctx
, omap
);
483 * Attempts to remove omap from this object's omap.
486 * if unwritable, returns -EACCES.
487 * if size < bound and key doesn't already exist in the omap, returns -EBALANCE.
488 * if any of the keys are not in this object, returns -ENODATA.
490 * @post: object has omap entries removed, and size xattr is updated
492 static int omap_remove(cls_method_context_t hctx
,
493 const std::set
<string
> &omap
, int bound
) {
497 r
= cls_cxx_stat(hctx
, &size
, &time
);
502 //first make sure the object is writable
503 r
= check_writable(hctx
);
508 //check for existance of the key first
509 for (set
<string
>::const_iterator it
= omap
.begin();
510 it
!= omap
.end(); ++it
) {
512 r
= cls_cxx_map_get_val(hctx
, *it
, &bl
);
513 if (r
== -ENOENT
|| r
== -ENODATA
514 || string(bl
.c_str(), bl
.length()) == ""){
517 CLS_LOG(20, "error reading omap val for %s: %d", it
->c_str(), r
);
522 //fail if removing from an object with only bound entries.
524 r
= cls_cxx_getxattr(hctx
, "size", &old_size
);
526 CLS_LOG(20, "error reading xattr %s: %d", "size", r
);
529 int old_size_int
= atoi(string(old_size
.c_str(), old_size
.length()).c_str());
531 CLS_LOG(20, "asserting size is greater than %d", bound
);
532 if (old_size_int
<= bound
) {
533 return -EKEYREJECTED
;
536 int new_size_int
= old_size_int
- omap
.size();
537 CLS_LOG(20, "old size is %d, new size is %d", old_size_int
, new_size_int
);
541 new_size
.append(s
.str());
543 r
= cls_cxx_setxattr(hctx
, "size", &new_size
);
545 CLS_LOG(20, "error setting xattr %s: %d", "unwritable", r
);
549 for (std::set
<string
>::const_iterator it
= omap
.begin();
550 it
!= omap
.end(); ++it
) {
551 r
= cls_cxx_map_remove_key(hctx
, *it
);
553 CLS_LOG(20, "error removing omap: %d", r
);
560 static int omap_remove_op(cls_method_context_t hctx
,
561 bufferlist
*in
, bufferlist
*out
) {
562 CLS_LOG(20, "omap_remove");
564 auto it
= in
->cbegin();
567 } catch (buffer::error
& err
) {
570 return omap_remove(hctx
, op
.omap
, op
.bound
);
574 * checks to see if this object needs to be split or rebalanced. if so, reads
575 * information about it.
577 * @post: if assert_size_in_bound(hctx, bound, comparator) succeeds,
578 * odata contains the size, omap, and unwritable attributes for this object.
579 * Otherwise, odata contains the size and unwritable attribute.
581 static int maybe_read_for_balance(cls_method_context_t hctx
,
582 object_data
&odata
, int bound
, int comparator
) {
583 CLS_LOG(20, "rebalance reading");
584 //if unwritable, return
585 int r
= check_writable(hctx
);
587 odata
.unwritable
= true;
588 CLS_LOG(20, "rebalance read: error getting xattr %s: %d", "unwritable", r
);
591 odata
.unwritable
= false;
594 //get the size attribute
596 r
= cls_cxx_getxattr(hctx
, "size", &size
);
598 CLS_LOG(20, "rebalance read: error getting xattr %s: %d", "size", r
);
601 odata
.size
= atoi(string(size
.c_str(), size
.length()).c_str());
603 //check if it needs to be balanced
604 r
= assert_size_in_bound(hctx
, bound
, comparator
);
606 CLS_LOG(20, "rebalance read: error on asserting size: %d", r
);
610 //if the assert succeeded, it needs to be balanced
612 r
= cls_cxx_map_get_vals(hctx
, "", "", LONG_MAX
, &odata
.omap
, &more
);
614 CLS_LOG(20, "rebalance read: getting kvs failed with error %d", r
);
618 CLS_LOG(20, "rebalance read: size xattr is %llu, omap size is %llu",
619 (unsigned long long)odata
.size
,
620 (unsigned long long)odata
.omap
.size());
624 static int maybe_read_for_balance_op(cls_method_context_t hctx
,
625 bufferlist
*in
, bufferlist
*out
) {
626 CLS_LOG(20, "maybe_read_for_balance");
628 auto it
= in
->cbegin();
631 } catch (buffer::error
& err
) {
634 int r
= maybe_read_for_balance(hctx
, op
.odata
, op
.bound
, op
.comparator
);
646 CLS_LOG(20, "Loaded assert condition class!");
648 cls_handle_t h_class
;
649 cls_method_handle_t h_get_idata_from_key
;
650 cls_method_handle_t h_get_next_idata
;
651 cls_method_handle_t h_get_prev_idata
;
652 cls_method_handle_t h_read_many
;
653 cls_method_handle_t h_check_writable
;
654 cls_method_handle_t h_assert_size_in_bound
;
655 cls_method_handle_t h_omap_insert
;
656 cls_method_handle_t h_create_with_omap
;
657 cls_method_handle_t h_omap_remove
;
658 cls_method_handle_t h_maybe_read_for_balance
;
660 cls_register("kvs", &h_class
);
661 cls_register_cxx_method(h_class
, "get_idata_from_key",
663 get_idata_from_key_op
, &h_get_idata_from_key
);
664 cls_register_cxx_method(h_class
, "get_next_idata",
666 get_next_idata_op
, &h_get_next_idata
);
667 cls_register_cxx_method(h_class
, "get_prev_idata",
669 get_prev_idata_op
, &h_get_prev_idata
);
670 cls_register_cxx_method(h_class
, "read_many",
672 read_many_op
, &h_read_many
);
673 cls_register_cxx_method(h_class
, "check_writable",
674 CLS_METHOD_RD
| CLS_METHOD_WR
,
675 check_writable_op
, &h_check_writable
);
676 cls_register_cxx_method(h_class
, "assert_size_in_bound",
678 assert_size_in_bound_op
, &h_assert_size_in_bound
);
679 cls_register_cxx_method(h_class
, "omap_insert",
681 omap_insert_op
, &h_omap_insert
);
682 cls_register_cxx_method(h_class
, "create_with_omap",
684 create_with_omap_op
, &h_create_with_omap
);
685 cls_register_cxx_method(h_class
, "omap_remove",
687 omap_remove_op
, &h_omap_remove
);
688 cls_register_cxx_method(h_class
, "maybe_read_for_balance",
690 maybe_read_for_balance_op
, &h_maybe_read_for_balance
);