2 * OSD classes for the key value store
4 * Created on: Aug 10, 2012
5 * Author: Eleanor Cawthon
8 #include "objclass/objclass.h"
10 #include "key_value_store/kvs_arg_types.h"
11 #include "include/types.h"
17 * finds the index_data where a key belongs.
19 * @param key: the key to search for
20 * @param idata: the index_data for the first index value such that idata.key
21 * is greater than key.
22 * @param next_idata: the index_data for the next index entry after idata
23 * @pre: key is not encoded
24 * @post: idata contains complete information
27 static int get_idata_from_key(cls_method_context_t hctx
, const string
&key
,
28 index_data
&idata
, index_data
&next_idata
) {
31 std::map
<std::string
, bufferlist
> kvmap
;
33 r
= cls_cxx_map_get_vals(hctx
, key_data(key
).encoded(), "", 2, &kvmap
);
35 CLS_LOG(20, "error reading index for range %s: %d", key
.c_str(), r
);
39 r
= cls_cxx_map_get_val(hctx
, key_data(key
).encoded(), &raw_val
);
41 CLS_LOG(20, "%s is already in the index: %d", key
.c_str(), r
);
42 bufferlist::iterator b
= raw_val
.begin();
45 bufferlist::iterator b
= kvmap
.begin()->second
.begin();
49 } else if (r
== -ENOENT
|| r
== -ENODATA
) {
50 bufferlist::iterator b
= kvmap
.begin()->second
.begin();
52 if (idata
.kdata
.prefix
!= "1") {
53 bufferlist::iterator nb
= (++kvmap
.begin())->second
.begin();
54 next_idata
.decode(nb
);
58 CLS_LOG(20, "error reading index for duplicates %s: %d", key
.c_str(), r
);
62 CLS_LOG(20, "idata is %s", idata
.str().c_str());
67 static int get_idata_from_key_op(cls_method_context_t hctx
,
68 bufferlist
*in
, bufferlist
*out
) {
69 CLS_LOG(20, "get_idata_from_key_op");
70 idata_from_key_args op
;
71 bufferlist::iterator it
= in
->begin();
74 } catch (buffer::error
& err
) {
75 CLS_LOG(20, "error decoding idata_from_key_args.");
78 int r
= get_idata_from_key(hctx
, op
.key
, op
.idata
, op
.next_idata
);
88 * finds the object in the index with the lowest key value that is greater
89 * than idata.key. If idata.key is the max key, returns -EOVERFLOW. If
90 * idata has a prefix and has timed out, cleans up.
92 * @param idata: idata for the object to search for.
93 * @param out_data: the idata for the next object.
95 * @pre: idata must contain a key.
96 * @post: out_data contains complete information
98 static int get_next_idata(cls_method_context_t hctx
, const index_data
&idata
,
99 index_data
&out_data
) {
101 std::map
<std::string
, bufferlist
> kvs
;
102 r
= cls_cxx_map_get_vals(hctx
, idata
.kdata
.encoded(), "", 1, &kvs
);
104 CLS_LOG(20, "getting kvs failed with error %d", r
);
109 out_data
.kdata
.parse(kvs
.begin()->first
);
110 bufferlist::iterator b
= kvs
.begin()->second
.begin();
119 static int get_next_idata_op(cls_method_context_t hctx
,
120 bufferlist
*in
, bufferlist
*out
) {
121 CLS_LOG(20, "get_next_idata_op");
122 idata_from_idata_args op
;
123 bufferlist::iterator it
= in
->begin();
126 } catch (buffer::error
& err
) {
129 int r
= get_next_idata(hctx
, op
.idata
, op
.next_idata
);
139 * finds the object in the index with the highest key value that is less
140 * than idata.key. If idata.key is the lowest key, returns -ERANGE If
141 * idata has a prefix and has timed out, cleans up.
143 * @param idata: idata for the object to search for.
144 * @param out_data: the idata for the next object.
146 * @pre: idata must contain a key.
147 * @ost: out_data contains complete information
149 static int get_prev_idata(cls_method_context_t hctx
, const index_data
&idata
,
150 index_data
&out_data
) {
152 std::map
<std::string
, bufferlist
> kvs
;
153 r
= cls_cxx_map_get_vals(hctx
, "", "", LONG_MAX
, &kvs
);
155 CLS_LOG(20, "getting kvs failed with error %d", r
);
159 std::map
<std::string
, bufferlist
>::iterator it
=
160 kvs
.lower_bound(idata
.kdata
.encoded());
161 if (it
->first
!= idata
.kdata
.encoded()) {
162 CLS_LOG(20, "object %s not found in the index (expected %s, found %s)",
163 idata
.str().c_str(), idata
.kdata
.encoded().c_str(),
167 if (it
== kvs
.begin()) {
168 //it is the first object, there is no previous.
173 out_data
.kdata
.parse(it
->first
);
174 bufferlist::iterator b
= it
->second
.begin();
180 static int get_prev_idata_op(cls_method_context_t hctx
,
181 bufferlist
*in
, bufferlist
*out
) {
182 CLS_LOG(20, "get_next_idata_op");
183 idata_from_idata_args op
;
184 bufferlist::iterator it
= in
->begin();
187 } catch (buffer::error
& err
) {
190 int r
= get_prev_idata(hctx
, op
.idata
, op
.next_idata
);
200 * Read all of the index entries where any keys in the map go
202 static int read_many(cls_method_context_t hctx
, const set
<string
> &keys
,
203 map
<string
, bufferlist
> * out
) {
205 CLS_ERR("reading from a map of size %d, first key encoded is %s",
206 (int)keys
.size(), key_data(*keys
.begin()).encoded().c_str());
207 r
= cls_cxx_map_get_vals(hctx
, key_data(*keys
.begin()).encoded().c_str(),
210 CLS_ERR("getting omap vals failed with error %d", r
);
213 CLS_ERR("got map of size %d ", (int)out
->size());
214 if (out
->size() > 1) {
215 out
->erase(out
->upper_bound(key_data(*keys
.rbegin()).encoded().c_str()),
218 CLS_ERR("returning map of size %d", (int)out
->size());
222 static int read_many_op(cls_method_context_t hctx
, bufferlist
*in
,
224 CLS_LOG(20, "read_many_op");
226 map
<string
, bufferlist
> outmap
;
227 bufferlist::iterator it
= in
->begin();
230 } catch (buffer::error
& err
) {
233 int r
= read_many(hctx
, op
, &outmap
);
237 encode(outmap
, *out
);
243 * Checks the unwritable xattr. If it is "1" (i.e., it is unwritable), returns
244 * -EACCES. otherwise, returns 0.
246 static int check_writable(cls_method_context_t hctx
) {
248 int r
= cls_cxx_getxattr(hctx
, "unwritable", &bl
);
250 CLS_LOG(20, "error reading xattr %s: %d", "unwritable", r
);
253 if (string(bl
.c_str(), bl
.length()) == "1") {
260 static int check_writable_op(cls_method_context_t hctx
,
261 bufferlist
*in
, bufferlist
*out
) {
262 CLS_LOG(20, "check_writable_op");
263 return check_writable(hctx
);
267 * returns -EKEYREJECTED if size is outside of bound, according to comparator.
269 * @bound: the limit to test
270 * @comparator: should be CEPH_OSD_CMPXATTR_OP_[EQ|GT|LT]
272 static int assert_size_in_bound(cls_method_context_t hctx
, int bound
,
276 int r
= cls_cxx_getxattr(hctx
, "size", &size_bl
);
278 CLS_LOG(20, "error reading xattr %s: %d", "size", r
);
282 int size
= atoi(string(size_bl
.c_str(), size_bl
.length()).c_str());
283 CLS_LOG(20, "size is %d, bound is %d", size
, bound
);
285 //compare size to comparator
286 switch (comparator
) {
287 case CEPH_OSD_CMPXATTR_OP_EQ
:
289 return -EKEYREJECTED
;
292 case CEPH_OSD_CMPXATTR_OP_LT
:
294 return -EKEYREJECTED
;
297 case CEPH_OSD_CMPXATTR_OP_GT
:
299 return -EKEYREJECTED
;
303 CLS_LOG(20, "invalid argument passed to assert_size_in_bound: %d",
310 static int assert_size_in_bound_op(cls_method_context_t hctx
,
311 bufferlist
*in
, bufferlist
*out
) {
312 CLS_LOG(20, "assert_size_in_bound_op");
314 bufferlist::iterator it
= in
->begin();
317 } catch (buffer::error
& err
) {
320 return assert_size_in_bound(hctx
, op
.bound
, op
.comparator
);
324 * Attempts to insert omap into this object's omap.
327 * if unwritable, returns -EACCES.
328 * if size > bound and key doesn't already exist in the omap, returns -EBALANCE.
329 * if exclusive is true, returns -EEXIST if any keys already exist.
331 * @post: object has omap entries inserted, and size xattr is updated
333 static int omap_insert(cls_method_context_t hctx
,
334 const map
<string
, bufferlist
> &omap
, int bound
, bool exclusive
) {
338 int r
= cls_cxx_stat(hctx
, &size
, &time
);
342 CLS_LOG(20, "inserting %s", omap
.begin()->first
.c_str());
343 r
= check_writable(hctx
);
345 CLS_LOG(20, "omap_insert: this object is unwritable: %d", r
);
349 int assert_bound
= bound
;
351 //if this is an exclusive insert, make sure the key doesn't already exist.
352 for (map
<string
, bufferlist
>::const_iterator it
= omap
.begin();
353 it
!= omap
.end(); ++it
) {
355 r
= cls_cxx_map_get_val(hctx
, it
->first
, &bl
);
356 if (r
== 0 && string(bl
.c_str(), bl
.length()) != ""){
358 CLS_LOG(20, "error: this is an exclusive insert and %s exists.",
363 CLS_LOG(20, "increased assert_bound to %d", assert_bound
);
364 } else if (r
!= -ENODATA
&& r
!= -ENOENT
) {
365 CLS_LOG(20, "error reading omap val for %s: %d", it
->first
.c_str(), r
);
371 r
= cls_cxx_getxattr(hctx
, "size", &old_size
);
373 CLS_LOG(20, "error reading xattr %s: %d", "size", r
);
377 int old_size_int
= atoi(string(old_size
.c_str(), old_size
.length()).c_str());
379 CLS_LOG(20, "asserting size is less than %d (bound is %d)", assert_bound
, bound
);
380 if (old_size_int
>= assert_bound
) {
381 return -EKEYREJECTED
;
384 int new_size_int
= old_size_int
+ omap
.size() - (assert_bound
- bound
);
385 CLS_LOG(20, "old size is %d, new size is %d", old_size_int
, new_size_int
);
389 new_size
.append(s
.str());
391 r
= cls_cxx_map_set_vals(hctx
, &omap
);
393 CLS_LOG(20, "error setting omap: %d", r
);
397 r
= cls_cxx_setxattr(hctx
, "size", &new_size
);
399 CLS_LOG(20, "error setting xattr %s: %d", "size", r
);
402 CLS_LOG(20, "successfully inserted %s", omap
.begin()->first
.c_str());
406 static int omap_insert_op(cls_method_context_t hctx
,
407 bufferlist
*in
, bufferlist
*out
) {
408 CLS_LOG(20, "omap_insert");
410 bufferlist::iterator it
= in
->begin();
413 } catch (buffer::error
& err
) {
416 return omap_insert(hctx
, op
.omap
, op
.bound
, op
.exclusive
);
419 static int create_with_omap(cls_method_context_t hctx
,
420 const map
<string
, bufferlist
> &omap
) {
421 CLS_LOG(20, "creating with omap: %s", omap
.begin()->first
.c_str());
422 //first make sure the object is writable
423 int r
= cls_cxx_create(hctx
, true);
425 CLS_LOG(20, "omap create: creating failed: %d", r
);
429 int new_size_int
= omap
.size();
430 CLS_LOG(20, "omap insert: new size is %d", new_size_int
);
434 new_size
.append(s
.str());
436 r
= cls_cxx_map_set_vals(hctx
, &omap
);
438 CLS_LOG(20, "omap create: error setting omap: %d", r
);
442 r
= cls_cxx_setxattr(hctx
, "size", &new_size
);
444 CLS_LOG(20, "omap create: error setting xattr %s: %d", "size", r
);
450 r
= cls_cxx_setxattr(hctx
, "unwritable", &u
);
452 CLS_LOG(20, "omap create: error setting xattr %s: %d", "unwritable", r
);
456 CLS_LOG(20, "successfully created %s", omap
.begin()->first
.c_str());
460 static int create_with_omap_op(cls_method_context_t hctx
,
461 bufferlist
*in
, bufferlist
*out
) {
462 CLS_LOG(20, "omap_insert");
463 map
<string
, bufferlist
> omap
;
464 bufferlist::iterator it
= in
->begin();
467 } catch (buffer::error
& err
) {
470 return create_with_omap(hctx
, omap
);
474 * Attempts to remove omap from this object's omap.
477 * if unwritable, returns -EACCES.
478 * if size < bound and key doesn't already exist in the omap, returns -EBALANCE.
479 * if any of the keys are not in this object, returns -ENODATA.
481 * @post: object has omap entries removed, and size xattr is updated
483 static int omap_remove(cls_method_context_t hctx
,
484 const std::set
<string
> &omap
, int bound
) {
488 r
= cls_cxx_stat(hctx
, &size
, &time
);
493 //first make sure the object is writable
494 r
= check_writable(hctx
);
499 //check for existance of the key first
500 for (set
<string
>::const_iterator it
= omap
.begin();
501 it
!= omap
.end(); ++it
) {
503 r
= cls_cxx_map_get_val(hctx
, *it
, &bl
);
504 if (r
== -ENOENT
|| r
== -ENODATA
505 || string(bl
.c_str(), bl
.length()) == ""){
508 CLS_LOG(20, "error reading omap val for %s: %d", it
->c_str(), r
);
513 //fail if removing from an object with only bound entries.
515 r
= cls_cxx_getxattr(hctx
, "size", &old_size
);
517 CLS_LOG(20, "error reading xattr %s: %d", "size", r
);
520 int old_size_int
= atoi(string(old_size
.c_str(), old_size
.length()).c_str());
522 CLS_LOG(20, "asserting size is greater than %d", bound
);
523 if (old_size_int
<= bound
) {
524 return -EKEYREJECTED
;
527 int new_size_int
= old_size_int
- omap
.size();
528 CLS_LOG(20, "old size is %d, new size is %d", old_size_int
, new_size_int
);
532 new_size
.append(s
.str());
534 r
= cls_cxx_setxattr(hctx
, "size", &new_size
);
536 CLS_LOG(20, "error setting xattr %s: %d", "unwritable", r
);
540 for (std::set
<string
>::const_iterator it
= omap
.begin();
541 it
!= omap
.end(); ++it
) {
542 r
= cls_cxx_map_remove_key(hctx
, *it
);
544 CLS_LOG(20, "error removing omap: %d", r
);
551 static int omap_remove_op(cls_method_context_t hctx
,
552 bufferlist
*in
, bufferlist
*out
) {
553 CLS_LOG(20, "omap_remove");
555 bufferlist::iterator it
= in
->begin();
558 } catch (buffer::error
& err
) {
561 return omap_remove(hctx
, op
.omap
, op
.bound
);
565 * checks to see if this object needs to be split or rebalanced. if so, reads
566 * information about it.
568 * @post: if assert_size_in_bound(hctx, bound, comparator) succeeds,
569 * odata contains the size, omap, and unwritable attributes for this object.
570 * Otherwise, odata contains the size and unwritable attribute.
572 static int maybe_read_for_balance(cls_method_context_t hctx
,
573 object_data
&odata
, int bound
, int comparator
) {
574 CLS_LOG(20, "rebalance reading");
575 //if unwritable, return
576 int r
= check_writable(hctx
);
578 odata
.unwritable
= true;
579 CLS_LOG(20, "rebalance read: error getting xattr %s: %d", "unwritable", r
);
582 odata
.unwritable
= false;
585 //get the size attribute
587 r
= cls_cxx_getxattr(hctx
, "size", &size
);
589 CLS_LOG(20, "rebalance read: error getting xattr %s: %d", "size", r
);
592 odata
.size
= atoi(string(size
.c_str(), size
.length()).c_str());
594 //check if it needs to be balanced
595 r
= assert_size_in_bound(hctx
, bound
, comparator
);
597 CLS_LOG(20, "rebalance read: error on asserting size: %d", r
);
601 //if the assert succeeded, it needs to be balanced
602 r
= cls_cxx_map_get_vals(hctx
, "", "", LONG_MAX
, &odata
.omap
);
604 CLS_LOG(20, "rebalance read: getting kvs failed with error %d", r
);
608 CLS_LOG(20, "rebalance read: size xattr is %llu, omap size is %llu",
609 (unsigned long long)odata
.size
,
610 (unsigned long long)odata
.omap
.size());
614 static int maybe_read_for_balance_op(cls_method_context_t hctx
,
615 bufferlist
*in
, bufferlist
*out
) {
616 CLS_LOG(20, "maybe_read_for_balance");
618 bufferlist::iterator it
= in
->begin();
621 } catch (buffer::error
& err
) {
624 int r
= maybe_read_for_balance(hctx
, op
.odata
, op
.bound
, op
.comparator
);
636 CLS_LOG(20, "Loaded assert condition class!");
638 cls_handle_t h_class
;
639 cls_method_handle_t h_get_idata_from_key
;
640 cls_method_handle_t h_get_next_idata
;
641 cls_method_handle_t h_get_prev_idata
;
642 cls_method_handle_t h_read_many
;
643 cls_method_handle_t h_check_writable
;
644 cls_method_handle_t h_assert_size_in_bound
;
645 cls_method_handle_t h_omap_insert
;
646 cls_method_handle_t h_create_with_omap
;
647 cls_method_handle_t h_omap_remove
;
648 cls_method_handle_t h_maybe_read_for_balance
;
650 cls_register("kvs", &h_class
);
651 cls_register_cxx_method(h_class
, "get_idata_from_key",
653 get_idata_from_key_op
, &h_get_idata_from_key
);
654 cls_register_cxx_method(h_class
, "get_next_idata",
656 get_next_idata_op
, &h_get_next_idata
);
657 cls_register_cxx_method(h_class
, "get_prev_idata",
659 get_prev_idata_op
, &h_get_prev_idata
);
660 cls_register_cxx_method(h_class
, "read_many",
662 read_many_op
, &h_read_many
);
663 cls_register_cxx_method(h_class
, "check_writable",
664 CLS_METHOD_RD
| CLS_METHOD_WR
,
665 check_writable_op
, &h_check_writable
);
666 cls_register_cxx_method(h_class
, "assert_size_in_bound",
668 assert_size_in_bound_op
, &h_assert_size_in_bound
);
669 cls_register_cxx_method(h_class
, "omap_insert",
671 omap_insert_op
, &h_omap_insert
);
672 cls_register_cxx_method(h_class
, "create_with_omap",
674 create_with_omap_op
, &h_create_with_omap
);
675 cls_register_cxx_method(h_class
, "omap_remove",
677 omap_remove_op
, &h_omap_remove
);
678 cls_register_cxx_method(h_class
, "maybe_read_for_balance",
680 maybe_read_for_balance_op
, &h_maybe_read_for_balance
);