]> git.proxmox.com Git - ceph.git/blob - ceph/src/key_value_store/cls_kvs.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / key_value_store / cls_kvs.cc
1 /*
2 * OSD classes for the key value store
3 *
4 * Created on: Aug 10, 2012
5 * Author: Eleanor Cawthon
6 */
7
8 #include "include/compat.h"
9 #include "objclass/objclass.h"
10 #include <errno.h>
11 #include "key_value_store/kvs_arg_types.h"
12 #include "include/types.h"
13 #include <iostream>
14 #include <climits>
15
16 using std::string;
17 using std::map;
18 using std::set;
19
20 /**
21 * finds the index_data where a key belongs.
22 *
23 * @param key: the key to search for
24 * @param idata: the index_data for the first index value such that idata.key
25 * is greater than key.
26 * @param next_idata: the index_data for the next index entry after idata
27 * @pre: key is not encoded
28 * @post: idata contains complete information
29 * stored
30 */
31 static int get_idata_from_key(cls_method_context_t hctx, const string &key,
32 index_data &idata, index_data &next_idata) {
33 bufferlist raw_val;
34 int r = 0;
35 std::map<std::string, bufferlist> kvmap;
36
37 bool more;
38
39 r = cls_cxx_map_get_vals(hctx, key_data(key).encoded(), "", 2, &kvmap, &more);
40 if (r < 0) {
41 CLS_LOG(20, "error reading index for range %s: %d", key.c_str(), r);
42 return r;
43 }
44
45 r = cls_cxx_map_get_val(hctx, key_data(key).encoded(), &raw_val);
46 if (r == 0){
47 CLS_LOG(20, "%s is already in the index: %d", key.c_str(), r);
48 auto b = raw_val.cbegin();
49 idata.decode(b);
50 if (!kvmap.empty()) {
51 auto b = kvmap.begin()->second.cbegin();
52 next_idata.decode(b);
53 }
54 return r;
55 } else if (r == -ENOENT || r == -ENODATA) {
56 auto b = kvmap.begin()->second.cbegin();
57 idata.decode(b);
58 if (idata.kdata.prefix != "1") {
59 auto nb = (++kvmap.begin())->second.cbegin();
60 next_idata.decode(nb);
61 }
62 r = 0;
63 } else if (r < 0) {
64 CLS_LOG(20, "error reading index for duplicates %s: %d", key.c_str(), r);
65 return r;
66 }
67
68 CLS_LOG(20, "idata is %s", idata.str().c_str());
69 return r;
70 }
71
72
73 static int get_idata_from_key_op(cls_method_context_t hctx,
74 bufferlist *in, bufferlist *out) {
75 CLS_LOG(20, "get_idata_from_key_op");
76 idata_from_key_args op;
77 auto it = in->cbegin();
78 try {
79 decode(op, it);
80 } catch (buffer::error& err) {
81 CLS_LOG(20, "error decoding idata_from_key_args.");
82 return -EINVAL;
83 }
84 int r = get_idata_from_key(hctx, op.key, op.idata, op.next_idata);
85 if (r < 0) {
86 return r;
87 } else {
88 encode(op, *out);
89 return 0;
90 }
91 }
92
93 /**
94 * finds the object in the index with the lowest key value that is greater
95 * than idata.key. If idata.key is the max key, returns -EOVERFLOW. If
96 * idata has a prefix and has timed out, cleans up.
97 *
98 * @param idata: idata for the object to search for.
99 * @param out_data: the idata for the next object.
100 *
101 * @pre: idata must contain a key.
102 * @post: out_data contains complete information
103 */
104 static int get_next_idata(cls_method_context_t hctx, const index_data &idata,
105 index_data &out_data) {
106 int r = 0;
107 std::map<std::string, bufferlist> kvs;
108 bool more;
109 r = cls_cxx_map_get_vals(hctx, idata.kdata.encoded(), "", 1, &kvs, &more);
110 if (r < 0){
111 CLS_LOG(20, "getting kvs failed with error %d", r);
112 return r;
113 }
114
115 if (!kvs.empty()) {
116 out_data.kdata.parse(kvs.begin()->first);
117 auto b = kvs.begin()->second.cbegin();
118 out_data.decode(b);
119 } else {
120 r = -EOVERFLOW;
121 }
122
123 return r;
124 }
125
126 static int get_next_idata_op(cls_method_context_t hctx,
127 bufferlist *in, bufferlist *out) {
128 CLS_LOG(20, "get_next_idata_op");
129 idata_from_idata_args op;
130 auto it = in->cbegin();
131 try {
132 decode(op, it);
133 } catch (buffer::error& err) {
134 return -EINVAL;
135 }
136 int r = get_next_idata(hctx, op.idata, op.next_idata);
137 if (r < 0) {
138 return r;
139 } else {
140 op.encode(*out);
141 return 0;
142 }
143 }
144
145 /**
146 * finds the object in the index with the highest key value that is less
147 * than idata.key. If idata.key is the lowest key, returns -ERANGE If
148 * idata has a prefix and has timed out, cleans up.
149 *
150 * @param idata: idata for the object to search for.
151 * @param out_data: the idata for the next object.
152 *
153 * @pre: idata must contain a key.
154 * @ost: out_data contains complete information
155 */
156 static int get_prev_idata(cls_method_context_t hctx, const index_data &idata,
157 index_data &out_data) {
158 int r = 0;
159 std::map<std::string, bufferlist> kvs;
160 bool more;
161 r = cls_cxx_map_get_vals(hctx, "", "", LONG_MAX, &kvs, &more);
162 if (r < 0){
163 CLS_LOG(20, "getting kvs failed with error %d", r);
164 return r;
165 }
166
167 std::map<std::string, bufferlist>::iterator it =
168 kvs.lower_bound(idata.kdata.encoded());
169 if (it->first != idata.kdata.encoded()) {
170 CLS_LOG(20, "object %s not found in the index (expected %s, found %s)",
171 idata.str().c_str(), idata.kdata.encoded().c_str(),
172 it->first.c_str());
173 return -ENODATA;
174 }
175 if (it == kvs.begin()) {
176 //it is the first object, there is no previous.
177 return -ERANGE;
178 } else {
179 --it;
180 }
181 out_data.kdata.parse(it->first);
182 auto b = it->second.cbegin();
183 out_data.decode(b);
184
185 return 0;
186 }
187
188 static int get_prev_idata_op(cls_method_context_t hctx,
189 bufferlist *in, bufferlist *out) {
190 CLS_LOG(20, "get_next_idata_op");
191 idata_from_idata_args op;
192 auto it = in->cbegin();
193 try {
194 decode(op, it);
195 } catch (buffer::error& err) {
196 return -EINVAL;
197 }
198 int r = get_prev_idata(hctx, op.idata, op.next_idata);
199 if (r < 0) {
200 return r;
201 } else {
202 op.encode(*out);
203 return 0;
204 }
205 }
206
207 /**
208 * Read all of the index entries where any keys in the map go
209 */
210 static int read_many(cls_method_context_t hctx, const set<string> &keys,
211 map<string, bufferlist> * out) {
212 int r = 0;
213 bool more;
214 CLS_ERR("reading from a map of size %d, first key encoded is %s",
215 (int)keys.size(), key_data(*keys.begin()).encoded().c_str());
216 r = cls_cxx_map_get_vals(hctx, key_data(*keys.begin()).encoded().c_str(),
217 "", LONG_MAX, out, &more);
218 if (r < 0) {
219 CLS_ERR("getting omap vals failed with error %d", r);
220 }
221
222 CLS_ERR("got map of size %d ", (int)out->size());
223 if (out->size() > 1) {
224 out->erase(out->upper_bound(key_data(*keys.rbegin()).encoded().c_str()),
225 out->end());
226 }
227 CLS_ERR("returning map of size %d", (int)out->size());
228 return r;
229 }
230
231 static int read_many_op(cls_method_context_t hctx, bufferlist *in,
232 bufferlist *out) {
233 CLS_LOG(20, "read_many_op");
234 set<string> op;
235 map<string, bufferlist> outmap;
236 auto it = in->cbegin();
237 try {
238 decode(op, it);
239 } catch (buffer::error & err) {
240 return -EINVAL;
241 }
242 int r = read_many(hctx, op, &outmap);
243 if (r < 0) {
244 return r;
245 } else {
246 encode(outmap, *out);
247 return 0;
248 }
249 }
250
251 /**
252 * Checks the unwritable xattr. If it is "1" (i.e., it is unwritable), returns
253 * -EACCES. otherwise, returns 0.
254 */
255 static int check_writable(cls_method_context_t hctx) {
256 bufferlist bl;
257 int r = cls_cxx_getxattr(hctx, "unwritable", &bl);
258 if (r < 0) {
259 CLS_LOG(20, "error reading xattr %s: %d", "unwritable", r);
260 return r;
261 }
262 if (string(bl.c_str(), bl.length()) == "1") {
263 return -EACCES;
264 } else{
265 return 0;
266 }
267 }
268
269 static int check_writable_op(cls_method_context_t hctx,
270 bufferlist *in, bufferlist *out) {
271 CLS_LOG(20, "check_writable_op");
272 return check_writable(hctx);
273 }
274
275 /**
276 * returns -EKEYREJECTED if size is outside of bound, according to comparator.
277 *
278 * @bound: the limit to test
279 * @comparator: should be CEPH_OSD_CMPXATTR_OP_[EQ|GT|LT]
280 */
281 static int assert_size_in_bound(cls_method_context_t hctx, int bound,
282 int comparator) {
283 //determine size
284 bufferlist size_bl;
285 int r = cls_cxx_getxattr(hctx, "size", &size_bl);
286 if (r < 0) {
287 CLS_LOG(20, "error reading xattr %s: %d", "size", r);
288 return r;
289 }
290
291 int size = atoi(string(size_bl.c_str(), size_bl.length()).c_str());
292 CLS_LOG(20, "size is %d, bound is %d", size, bound);
293
294 //compare size to comparator
295 switch (comparator) {
296 case CEPH_OSD_CMPXATTR_OP_EQ:
297 if (size != bound) {
298 return -EKEYREJECTED;
299 }
300 break;
301 case CEPH_OSD_CMPXATTR_OP_LT:
302 if (size >= bound) {
303 return -EKEYREJECTED;
304 }
305 break;
306 case CEPH_OSD_CMPXATTR_OP_GT:
307 if (size <= bound) {
308 return -EKEYREJECTED;
309 }
310 break;
311 default:
312 CLS_LOG(20, "invalid argument passed to assert_size_in_bound: %d",
313 comparator);
314 return -EINVAL;
315 }
316 return 0;
317 }
318
319 static int assert_size_in_bound_op(cls_method_context_t hctx,
320 bufferlist *in, bufferlist *out) {
321 CLS_LOG(20, "assert_size_in_bound_op");
322 assert_size_args op;
323 auto it = in->cbegin();
324 try {
325 decode(op, it);
326 } catch (buffer::error& err) {
327 return -EINVAL;
328 }
329 return assert_size_in_bound(hctx, op.bound, op.comparator);
330 }
331
332 /**
333 * Attempts to insert omap into this object's omap.
334 *
335 * @return:
336 * if unwritable, returns -EACCES.
337 * if size > bound and key doesn't already exist in the omap, returns -EBALANCE.
338 * if exclusive is true, returns -EEXIST if any keys already exist.
339 *
340 * @post: object has omap entries inserted, and size xattr is updated
341 */
342 static int omap_insert(cls_method_context_t hctx,
343 const map<string, bufferlist> &omap, int bound, bool exclusive) {
344
345 uint64_t size;
346 time_t time;
347 int r = cls_cxx_stat(hctx, &size, &time);
348 if (r < 0) {
349 return r;
350 }
351 CLS_LOG(20, "inserting %s", omap.begin()->first.c_str());
352 r = check_writable(hctx);
353 if (r < 0) {
354 CLS_LOG(20, "omap_insert: this object is unwritable: %d", r);
355 return r;
356 }
357
358 int assert_bound = bound;
359
360 //if this is an exclusive insert, make sure the key doesn't already exist.
361 for (map<string, bufferlist>::const_iterator it = omap.begin();
362 it != omap.end(); ++it) {
363 bufferlist bl;
364 r = cls_cxx_map_get_val(hctx, it->first, &bl);
365 if (r == 0 && string(bl.c_str(), bl.length()) != ""){
366 if (exclusive) {
367 CLS_LOG(20, "error: this is an exclusive insert and %s exists.",
368 it->first.c_str());
369 return -EEXIST;
370 }
371 assert_bound++;
372 CLS_LOG(20, "increased assert_bound to %d", assert_bound);
373 } else if (r != -ENODATA && r != -ENOENT) {
374 CLS_LOG(20, "error reading omap val for %s: %d", it->first.c_str(), r);
375 return r;
376 }
377 }
378
379 bufferlist old_size;
380 r = cls_cxx_getxattr(hctx, "size", &old_size);
381 if (r < 0) {
382 CLS_LOG(20, "error reading xattr %s: %d", "size", r);
383 return r;
384 }
385
386 int old_size_int = atoi(string(old_size.c_str(), old_size.length()).c_str());
387
388 CLS_LOG(20, "asserting size is less than %d (bound is %d)", assert_bound, bound);
389 if (old_size_int >= assert_bound) {
390 return -EKEYREJECTED;
391 }
392
393 int new_size_int = old_size_int + omap.size() - (assert_bound - bound);
394 CLS_LOG(20, "old size is %d, new size is %d", old_size_int, new_size_int);
395 bufferlist new_size;
396 std::stringstream s;
397 s << new_size_int;
398 new_size.append(s.str());
399
400 r = cls_cxx_map_set_vals(hctx, &omap);
401 if (r < 0) {
402 CLS_LOG(20, "error setting omap: %d", r);
403 return r;
404 }
405
406 r = cls_cxx_setxattr(hctx, "size", &new_size);
407 if (r < 0) {
408 CLS_LOG(20, "error setting xattr %s: %d", "size", r);
409 return r;
410 }
411 CLS_LOG(20, "successfully inserted %s", omap.begin()->first.c_str());
412 return 0;
413 }
414
415 static int omap_insert_op(cls_method_context_t hctx,
416 bufferlist *in, bufferlist *out) {
417 CLS_LOG(20, "omap_insert");
418 omap_set_args op;
419 auto it = in->cbegin();
420 try {
421 decode(op, it);
422 } catch (buffer::error& err) {
423 return -EINVAL;
424 }
425 return omap_insert(hctx, op.omap, op.bound, op.exclusive);
426 }
427
428 static int create_with_omap(cls_method_context_t hctx,
429 const map<string, bufferlist> &omap) {
430 CLS_LOG(20, "creating with omap: %s", omap.begin()->first.c_str());
431 //first make sure the object is writable
432 int r = cls_cxx_create(hctx, true);
433 if (r < 0) {
434 CLS_LOG(20, "omap create: creating failed: %d", r);
435 return r;
436 }
437
438 int new_size_int = omap.size();
439 CLS_LOG(20, "omap insert: new size is %d", new_size_int);
440 bufferlist new_size;
441 std::stringstream s;
442 s << new_size_int;
443 new_size.append(s.str());
444
445 r = cls_cxx_map_set_vals(hctx, &omap);
446 if (r < 0) {
447 CLS_LOG(20, "omap create: error setting omap: %d", r);
448 return r;
449 }
450
451 r = cls_cxx_setxattr(hctx, "size", &new_size);
452 if (r < 0) {
453 CLS_LOG(20, "omap create: error setting xattr %s: %d", "size", r);
454 return r;
455 }
456
457 bufferlist u;
458 u.append("0");
459 r = cls_cxx_setxattr(hctx, "unwritable", &u);
460 if (r < 0) {
461 CLS_LOG(20, "omap create: error setting xattr %s: %d", "unwritable", r);
462 return r;
463 }
464
465 CLS_LOG(20, "successfully created %s", omap.begin()->first.c_str());
466 return 0;
467 }
468
469 static int create_with_omap_op(cls_method_context_t hctx,
470 bufferlist *in, bufferlist *out) {
471 CLS_LOG(20, "omap_insert");
472 map<string, bufferlist> omap;
473 auto it = in->cbegin();
474 try {
475 decode(omap, it);
476 } catch (buffer::error& err) {
477 return -EINVAL;
478 }
479 return create_with_omap(hctx, omap);
480 }
481
482 /**
483 * Attempts to remove omap from this object's omap.
484 *
485 * @return:
486 * if unwritable, returns -EACCES.
487 * if size < bound and key doesn't already exist in the omap, returns -EBALANCE.
488 * if any of the keys are not in this object, returns -ENODATA.
489 *
490 * @post: object has omap entries removed, and size xattr is updated
491 */
492 static int omap_remove(cls_method_context_t hctx,
493 const std::set<string> &omap, int bound) {
494 int r;
495 uint64_t size;
496 time_t time;
497 r = cls_cxx_stat(hctx, &size, &time);
498 if (r < 0) {
499 return r;
500 }
501
502 //first make sure the object is writable
503 r = check_writable(hctx);
504 if (r < 0) {
505 return r;
506 }
507
508 //check for existance of the key first
509 for (set<string>::const_iterator it = omap.begin();
510 it != omap.end(); ++it) {
511 bufferlist bl;
512 r = cls_cxx_map_get_val(hctx, *it, &bl);
513 if (r == -ENOENT || r == -ENODATA
514 || string(bl.c_str(), bl.length()) == ""){
515 return -ENODATA;
516 } else if (r < 0) {
517 CLS_LOG(20, "error reading omap val for %s: %d", it->c_str(), r);
518 return r;
519 }
520 }
521
522 //fail if removing from an object with only bound entries.
523 bufferlist old_size;
524 r = cls_cxx_getxattr(hctx, "size", &old_size);
525 if (r < 0) {
526 CLS_LOG(20, "error reading xattr %s: %d", "size", r);
527 return r;
528 }
529 int old_size_int = atoi(string(old_size.c_str(), old_size.length()).c_str());
530
531 CLS_LOG(20, "asserting size is greater than %d", bound);
532 if (old_size_int <= bound) {
533 return -EKEYREJECTED;
534 }
535
536 int new_size_int = old_size_int - omap.size();
537 CLS_LOG(20, "old size is %d, new size is %d", old_size_int, new_size_int);
538 bufferlist new_size;
539 std::stringstream s;
540 s << new_size_int;
541 new_size.append(s.str());
542
543 r = cls_cxx_setxattr(hctx, "size", &new_size);
544 if (r < 0) {
545 CLS_LOG(20, "error setting xattr %s: %d", "unwritable", r);
546 return r;
547 }
548
549 for (std::set<string>::const_iterator it = omap.begin();
550 it != omap.end(); ++it) {
551 r = cls_cxx_map_remove_key(hctx, *it);
552 if (r < 0) {
553 CLS_LOG(20, "error removing omap: %d", r);
554 return r;
555 }
556 }
557 return 0;
558 }
559
560 static int omap_remove_op(cls_method_context_t hctx,
561 bufferlist *in, bufferlist *out) {
562 CLS_LOG(20, "omap_remove");
563 omap_rm_args op;
564 auto it = in->cbegin();
565 try {
566 decode(op, it);
567 } catch (buffer::error& err) {
568 return -EINVAL;
569 }
570 return omap_remove(hctx, op.omap, op.bound);
571 }
572
573 /**
574 * checks to see if this object needs to be split or rebalanced. if so, reads
575 * information about it.
576 *
577 * @post: if assert_size_in_bound(hctx, bound, comparator) succeeds,
578 * odata contains the size, omap, and unwritable attributes for this object.
579 * Otherwise, odata contains the size and unwritable attribute.
580 */
581 static int maybe_read_for_balance(cls_method_context_t hctx,
582 object_data &odata, int bound, int comparator) {
583 CLS_LOG(20, "rebalance reading");
584 //if unwritable, return
585 int r = check_writable(hctx);
586 if (r < 0) {
587 odata.unwritable = true;
588 CLS_LOG(20, "rebalance read: error getting xattr %s: %d", "unwritable", r);
589 return r;
590 } else {
591 odata.unwritable = false;
592 }
593
594 //get the size attribute
595 bufferlist size;
596 r = cls_cxx_getxattr(hctx, "size", &size);
597 if (r < 0) {
598 CLS_LOG(20, "rebalance read: error getting xattr %s: %d", "size", r);
599 return r;
600 }
601 odata.size = atoi(string(size.c_str(), size.length()).c_str());
602
603 //check if it needs to be balanced
604 r = assert_size_in_bound(hctx, bound, comparator);
605 if (r < 0) {
606 CLS_LOG(20, "rebalance read: error on asserting size: %d", r);
607 return -EBALANCE;
608 }
609
610 //if the assert succeeded, it needs to be balanced
611 bool more;
612 r = cls_cxx_map_get_vals(hctx, "", "", LONG_MAX, &odata.omap, &more);
613 if (r < 0){
614 CLS_LOG(20, "rebalance read: getting kvs failed with error %d", r);
615 return r;
616 }
617
618 CLS_LOG(20, "rebalance read: size xattr is %llu, omap size is %llu",
619 (unsigned long long)odata.size,
620 (unsigned long long)odata.omap.size());
621 return 0;
622 }
623
624 static int maybe_read_for_balance_op(cls_method_context_t hctx,
625 bufferlist *in, bufferlist *out) {
626 CLS_LOG(20, "maybe_read_for_balance");
627 rebalance_args op;
628 auto it = in->cbegin();
629 try {
630 decode(op, it);
631 } catch (buffer::error& err) {
632 return -EINVAL;
633 }
634 int r = maybe_read_for_balance(hctx, op.odata, op.bound, op.comparator);
635 if (r < 0) {
636 return r;
637 } else {
638 op.encode(*out);
639 return 0;
640 }
641 }
642
643
644 CLS_INIT(kvs)
645 {
646 CLS_LOG(20, "Loaded assert condition class!");
647
648 cls_handle_t h_class;
649 cls_method_handle_t h_get_idata_from_key;
650 cls_method_handle_t h_get_next_idata;
651 cls_method_handle_t h_get_prev_idata;
652 cls_method_handle_t h_read_many;
653 cls_method_handle_t h_check_writable;
654 cls_method_handle_t h_assert_size_in_bound;
655 cls_method_handle_t h_omap_insert;
656 cls_method_handle_t h_create_with_omap;
657 cls_method_handle_t h_omap_remove;
658 cls_method_handle_t h_maybe_read_for_balance;
659
660 cls_register("kvs", &h_class);
661 cls_register_cxx_method(h_class, "get_idata_from_key",
662 CLS_METHOD_RD,
663 get_idata_from_key_op, &h_get_idata_from_key);
664 cls_register_cxx_method(h_class, "get_next_idata",
665 CLS_METHOD_RD,
666 get_next_idata_op, &h_get_next_idata);
667 cls_register_cxx_method(h_class, "get_prev_idata",
668 CLS_METHOD_RD,
669 get_prev_idata_op, &h_get_prev_idata);
670 cls_register_cxx_method(h_class, "read_many",
671 CLS_METHOD_RD,
672 read_many_op, &h_read_many);
673 cls_register_cxx_method(h_class, "check_writable",
674 CLS_METHOD_RD | CLS_METHOD_WR,
675 check_writable_op, &h_check_writable);
676 cls_register_cxx_method(h_class, "assert_size_in_bound",
677 CLS_METHOD_WR,
678 assert_size_in_bound_op, &h_assert_size_in_bound);
679 cls_register_cxx_method(h_class, "omap_insert",
680 CLS_METHOD_WR,
681 omap_insert_op, &h_omap_insert);
682 cls_register_cxx_method(h_class, "create_with_omap",
683 CLS_METHOD_WR,
684 create_with_omap_op, &h_create_with_omap);
685 cls_register_cxx_method(h_class, "omap_remove",
686 CLS_METHOD_WR,
687 omap_remove_op, &h_omap_remove);
688 cls_register_cxx_method(h_class, "maybe_read_for_balance",
689 CLS_METHOD_RD,
690 maybe_read_for_balance_op, &h_maybe_read_for_balance);
691
692 return;
693 }