]> git.proxmox.com Git - ceph.git/blob - ceph/src/key_value_store/cls_kvs.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / key_value_store / cls_kvs.cc
1 /*
2 * OSD classes for the key value store
3 *
4 * Created on: Aug 10, 2012
5 * Author: Eleanor Cawthon
6 */
7
8 #include "objclass/objclass.h"
9 #include <errno.h>
10 #include "key_value_store/kvs_arg_types.h"
11 #include "include/types.h"
12 #include <iostream>
13 #include <climits>
14
15
16 /**
17 * finds the index_data where a key belongs.
18 *
19 * @param key: the key to search for
20 * @param idata: the index_data for the first index value such that idata.key
21 * is greater than key.
22 * @param next_idata: the index_data for the next index entry after idata
23 * @pre: key is not encoded
24 * @post: idata contains complete information
25 * stored
26 */
27 static int get_idata_from_key(cls_method_context_t hctx, const string &key,
28 index_data &idata, index_data &next_idata) {
29 bufferlist raw_val;
30 int r = 0;
31 std::map<std::string, bufferlist> kvmap;
32
33 r = cls_cxx_map_get_vals(hctx, key_data(key).encoded(), "", 2, &kvmap);
34 if (r < 0) {
35 CLS_LOG(20, "error reading index for range %s: %d", key.c_str(), r);
36 return r;
37 }
38
39 r = cls_cxx_map_get_val(hctx, key_data(key).encoded(), &raw_val);
40 if (r == 0){
41 CLS_LOG(20, "%s is already in the index: %d", key.c_str(), r);
42 bufferlist::iterator b = raw_val.begin();
43 idata.decode(b);
44 if (!kvmap.empty()) {
45 bufferlist::iterator b = kvmap.begin()->second.begin();
46 next_idata.decode(b);
47 }
48 return r;
49 } else if (r == -ENOENT || r == -ENODATA) {
50 bufferlist::iterator b = kvmap.begin()->second.begin();
51 idata.decode(b);
52 if (idata.kdata.prefix != "1") {
53 bufferlist::iterator nb = (++kvmap.begin())->second.begin();
54 next_idata.decode(nb);
55 }
56 r = 0;
57 } else if (r < 0) {
58 CLS_LOG(20, "error reading index for duplicates %s: %d", key.c_str(), r);
59 return r;
60 }
61
62 CLS_LOG(20, "idata is %s", idata.str().c_str());
63 return r;
64 }
65
66
67 static int get_idata_from_key_op(cls_method_context_t hctx,
68 bufferlist *in, bufferlist *out) {
69 CLS_LOG(20, "get_idata_from_key_op");
70 idata_from_key_args op;
71 bufferlist::iterator it = in->begin();
72 try {
73 ::decode(op, it);
74 } catch (buffer::error& err) {
75 CLS_LOG(20, "error decoding idata_from_key_args.");
76 return -EINVAL;
77 }
78 int r = get_idata_from_key(hctx, op.key, op.idata, op.next_idata);
79 if (r < 0) {
80 return r;
81 } else {
82 ::encode(op, *out);
83 return 0;
84 }
85 }
86
87 /**
88 * finds the object in the index with the lowest key value that is greater
89 * than idata.key. If idata.key is the max key, returns -EOVERFLOW. If
90 * idata has a prefix and has timed out, cleans up.
91 *
92 * @param idata: idata for the object to search for.
93 * @param out_data: the idata for the next object.
94 *
95 * @pre: idata must contain a key.
96 * @post: out_data contains complete information
97 */
98 static int get_next_idata(cls_method_context_t hctx, const index_data &idata,
99 index_data &out_data) {
100 int r = 0;
101 std::map<std::string, bufferlist> kvs;
102 r = cls_cxx_map_get_vals(hctx, idata.kdata.encoded(), "", 1, &kvs);
103 if (r < 0){
104 CLS_LOG(20, "getting kvs failed with error %d", r);
105 return r;
106 }
107
108 if (!kvs.empty()) {
109 out_data.kdata.parse(kvs.begin()->first);
110 bufferlist::iterator b = kvs.begin()->second.begin();
111 out_data.decode(b);
112 } else {
113 r = -EOVERFLOW;
114 }
115
116 return r;
117 }
118
119 static int get_next_idata_op(cls_method_context_t hctx,
120 bufferlist *in, bufferlist *out) {
121 CLS_LOG(20, "get_next_idata_op");
122 idata_from_idata_args op;
123 bufferlist::iterator it = in->begin();
124 try {
125 ::decode(op, it);
126 } catch (buffer::error& err) {
127 return -EINVAL;
128 }
129 int r = get_next_idata(hctx, op.idata, op.next_idata);
130 if (r < 0) {
131 return r;
132 } else {
133 op.encode(*out);
134 return 0;
135 }
136 }
137
138 /**
139 * finds the object in the index with the highest key value that is less
140 * than idata.key. If idata.key is the lowest key, returns -ERANGE If
141 * idata has a prefix and has timed out, cleans up.
142 *
143 * @param idata: idata for the object to search for.
144 * @param out_data: the idata for the next object.
145 *
146 * @pre: idata must contain a key.
147 * @ost: out_data contains complete information
148 */
149 static int get_prev_idata(cls_method_context_t hctx, const index_data &idata,
150 index_data &out_data) {
151 int r = 0;
152 std::map<std::string, bufferlist> kvs;
153 r = cls_cxx_map_get_vals(hctx, "", "", LONG_MAX, &kvs);
154 if (r < 0){
155 CLS_LOG(20, "getting kvs failed with error %d", r);
156 return r;
157 }
158
159 std::map<std::string, bufferlist>::iterator it =
160 kvs.lower_bound(idata.kdata.encoded());
161 if (it->first != idata.kdata.encoded()) {
162 CLS_LOG(20, "object %s not found in the index (expected %s, found %s)",
163 idata.str().c_str(), idata.kdata.encoded().c_str(),
164 it->first.c_str());
165 return -ENODATA;
166 }
167 if (it == kvs.begin()) {
168 //it is the first object, there is no previous.
169 return -ERANGE;
170 } else {
171 --it;
172 }
173 out_data.kdata.parse(it->first);
174 bufferlist::iterator b = it->second.begin();
175 out_data.decode(b);
176
177 return 0;
178 }
179
180 static int get_prev_idata_op(cls_method_context_t hctx,
181 bufferlist *in, bufferlist *out) {
182 CLS_LOG(20, "get_next_idata_op");
183 idata_from_idata_args op;
184 bufferlist::iterator it = in->begin();
185 try {
186 ::decode(op, it);
187 } catch (buffer::error& err) {
188 return -EINVAL;
189 }
190 int r = get_prev_idata(hctx, op.idata, op.next_idata);
191 if (r < 0) {
192 return r;
193 } else {
194 op.encode(*out);
195 return 0;
196 }
197 }
198
199 /**
200 * Read all of the index entries where any keys in the map go
201 */
202 static int read_many(cls_method_context_t hctx, const set<string> &keys,
203 map<string, bufferlist> * out) {
204 int r = 0;
205 CLS_ERR("reading from a map of size %d, first key encoded is %s",
206 (int)keys.size(), key_data(*keys.begin()).encoded().c_str());
207 r = cls_cxx_map_get_vals(hctx, key_data(*keys.begin()).encoded().c_str(),
208 "", LONG_MAX, out);
209 if (r < 0) {
210 CLS_ERR("getting omap vals failed with error %d", r);
211 }
212
213 CLS_ERR("got map of size %d ", (int)out->size());
214 if (out->size() > 1) {
215 out->erase(out->upper_bound(key_data(*keys.rbegin()).encoded().c_str()),
216 out->end());
217 }
218 CLS_ERR("returning map of size %d", (int)out->size());
219 return r;
220 }
221
222 static int read_many_op(cls_method_context_t hctx, bufferlist *in,
223 bufferlist *out) {
224 CLS_LOG(20, "read_many_op");
225 set<string> op;
226 map<string, bufferlist> outmap;
227 bufferlist::iterator it = in->begin();
228 try {
229 ::decode(op, it);
230 } catch (buffer::error & err) {
231 return -EINVAL;
232 }
233 int r = read_many(hctx, op, &outmap);
234 if (r < 0) {
235 return r;
236 } else {
237 encode(outmap, *out);
238 return 0;
239 }
240 }
241
242 /**
243 * Checks the unwritable xattr. If it is "1" (i.e., it is unwritable), returns
244 * -EACCES. otherwise, returns 0.
245 */
246 static int check_writable(cls_method_context_t hctx) {
247 bufferlist bl;
248 int r = cls_cxx_getxattr(hctx, "unwritable", &bl);
249 if (r < 0) {
250 CLS_LOG(20, "error reading xattr %s: %d", "unwritable", r);
251 return r;
252 }
253 if (string(bl.c_str(), bl.length()) == "1") {
254 return -EACCES;
255 } else{
256 return 0;
257 }
258 }
259
260 static int check_writable_op(cls_method_context_t hctx,
261 bufferlist *in, bufferlist *out) {
262 CLS_LOG(20, "check_writable_op");
263 return check_writable(hctx);
264 }
265
266 /**
267 * returns -EKEYREJECTED if size is outside of bound, according to comparator.
268 *
269 * @bound: the limit to test
270 * @comparator: should be CEPH_OSD_CMPXATTR_OP_[EQ|GT|LT]
271 */
272 static int assert_size_in_bound(cls_method_context_t hctx, int bound,
273 int comparator) {
274 //determine size
275 bufferlist size_bl;
276 int r = cls_cxx_getxattr(hctx, "size", &size_bl);
277 if (r < 0) {
278 CLS_LOG(20, "error reading xattr %s: %d", "size", r);
279 return r;
280 }
281
282 int size = atoi(string(size_bl.c_str(), size_bl.length()).c_str());
283 CLS_LOG(20, "size is %d, bound is %d", size, bound);
284
285 //compare size to comparator
286 switch (comparator) {
287 case CEPH_OSD_CMPXATTR_OP_EQ:
288 if (size != bound) {
289 return -EKEYREJECTED;
290 }
291 break;
292 case CEPH_OSD_CMPXATTR_OP_LT:
293 if (size >= bound) {
294 return -EKEYREJECTED;
295 }
296 break;
297 case CEPH_OSD_CMPXATTR_OP_GT:
298 if (size <= bound) {
299 return -EKEYREJECTED;
300 }
301 break;
302 default:
303 CLS_LOG(20, "invalid argument passed to assert_size_in_bound: %d",
304 comparator);
305 return -EINVAL;
306 }
307 return 0;
308 }
309
310 static int assert_size_in_bound_op(cls_method_context_t hctx,
311 bufferlist *in, bufferlist *out) {
312 CLS_LOG(20, "assert_size_in_bound_op");
313 assert_size_args op;
314 bufferlist::iterator it = in->begin();
315 try {
316 ::decode(op, it);
317 } catch (buffer::error& err) {
318 return -EINVAL;
319 }
320 return assert_size_in_bound(hctx, op.bound, op.comparator);
321 }
322
323 /**
324 * Attempts to insert omap into this object's omap.
325 *
326 * @return:
327 * if unwritable, returns -EACCES.
328 * if size > bound and key doesn't already exist in the omap, returns -EBALANCE.
329 * if exclusive is true, returns -EEXIST if any keys already exist.
330 *
331 * @post: object has omap entries inserted, and size xattr is updated
332 */
333 static int omap_insert(cls_method_context_t hctx,
334 const map<string, bufferlist> &omap, int bound, bool exclusive) {
335
336 uint64_t size;
337 time_t time;
338 int r = cls_cxx_stat(hctx, &size, &time);
339 if (r < 0) {
340 return r;
341 }
342 CLS_LOG(20, "inserting %s", omap.begin()->first.c_str());
343 r = check_writable(hctx);
344 if (r < 0) {
345 CLS_LOG(20, "omap_insert: this object is unwritable: %d", r);
346 return r;
347 }
348
349 int assert_bound = bound;
350
351 //if this is an exclusive insert, make sure the key doesn't already exist.
352 for (map<string, bufferlist>::const_iterator it = omap.begin();
353 it != omap.end(); ++it) {
354 bufferlist bl;
355 r = cls_cxx_map_get_val(hctx, it->first, &bl);
356 if (r == 0 && string(bl.c_str(), bl.length()) != ""){
357 if (exclusive) {
358 CLS_LOG(20, "error: this is an exclusive insert and %s exists.",
359 it->first.c_str());
360 return -EEXIST;
361 }
362 assert_bound++;
363 CLS_LOG(20, "increased assert_bound to %d", assert_bound);
364 } else if (r != -ENODATA && r != -ENOENT) {
365 CLS_LOG(20, "error reading omap val for %s: %d", it->first.c_str(), r);
366 return r;
367 }
368 }
369
370 bufferlist old_size;
371 r = cls_cxx_getxattr(hctx, "size", &old_size);
372 if (r < 0) {
373 CLS_LOG(20, "error reading xattr %s: %d", "size", r);
374 return r;
375 }
376
377 int old_size_int = atoi(string(old_size.c_str(), old_size.length()).c_str());
378
379 CLS_LOG(20, "asserting size is less than %d (bound is %d)", assert_bound, bound);
380 if (old_size_int >= assert_bound) {
381 return -EKEYREJECTED;
382 }
383
384 int new_size_int = old_size_int + omap.size() - (assert_bound - bound);
385 CLS_LOG(20, "old size is %d, new size is %d", old_size_int, new_size_int);
386 bufferlist new_size;
387 stringstream s;
388 s << new_size_int;
389 new_size.append(s.str());
390
391 r = cls_cxx_map_set_vals(hctx, &omap);
392 if (r < 0) {
393 CLS_LOG(20, "error setting omap: %d", r);
394 return r;
395 }
396
397 r = cls_cxx_setxattr(hctx, "size", &new_size);
398 if (r < 0) {
399 CLS_LOG(20, "error setting xattr %s: %d", "size", r);
400 return r;
401 }
402 CLS_LOG(20, "successfully inserted %s", omap.begin()->first.c_str());
403 return 0;
404 }
405
406 static int omap_insert_op(cls_method_context_t hctx,
407 bufferlist *in, bufferlist *out) {
408 CLS_LOG(20, "omap_insert");
409 omap_set_args op;
410 bufferlist::iterator it = in->begin();
411 try {
412 ::decode(op, it);
413 } catch (buffer::error& err) {
414 return -EINVAL;
415 }
416 return omap_insert(hctx, op.omap, op.bound, op.exclusive);
417 }
418
419 static int create_with_omap(cls_method_context_t hctx,
420 const map<string, bufferlist> &omap) {
421 CLS_LOG(20, "creating with omap: %s", omap.begin()->first.c_str());
422 //first make sure the object is writable
423 int r = cls_cxx_create(hctx, true);
424 if (r < 0) {
425 CLS_LOG(20, "omap create: creating failed: %d", r);
426 return r;
427 }
428
429 int new_size_int = omap.size();
430 CLS_LOG(20, "omap insert: new size is %d", new_size_int);
431 bufferlist new_size;
432 stringstream s;
433 s << new_size_int;
434 new_size.append(s.str());
435
436 r = cls_cxx_map_set_vals(hctx, &omap);
437 if (r < 0) {
438 CLS_LOG(20, "omap create: error setting omap: %d", r);
439 return r;
440 }
441
442 r = cls_cxx_setxattr(hctx, "size", &new_size);
443 if (r < 0) {
444 CLS_LOG(20, "omap create: error setting xattr %s: %d", "size", r);
445 return r;
446 }
447
448 bufferlist u;
449 u.append("0");
450 r = cls_cxx_setxattr(hctx, "unwritable", &u);
451 if (r < 0) {
452 CLS_LOG(20, "omap create: error setting xattr %s: %d", "unwritable", r);
453 return r;
454 }
455
456 CLS_LOG(20, "successfully created %s", omap.begin()->first.c_str());
457 return 0;
458 }
459
460 static int create_with_omap_op(cls_method_context_t hctx,
461 bufferlist *in, bufferlist *out) {
462 CLS_LOG(20, "omap_insert");
463 map<string, bufferlist> omap;
464 bufferlist::iterator it = in->begin();
465 try {
466 ::decode(omap, it);
467 } catch (buffer::error& err) {
468 return -EINVAL;
469 }
470 return create_with_omap(hctx, omap);
471 }
472
473 /**
474 * Attempts to remove omap from this object's omap.
475 *
476 * @return:
477 * if unwritable, returns -EACCES.
478 * if size < bound and key doesn't already exist in the omap, returns -EBALANCE.
479 * if any of the keys are not in this object, returns -ENODATA.
480 *
481 * @post: object has omap entries removed, and size xattr is updated
482 */
483 static int omap_remove(cls_method_context_t hctx,
484 const std::set<string> &omap, int bound) {
485 int r;
486 uint64_t size;
487 time_t time;
488 r = cls_cxx_stat(hctx, &size, &time);
489 if (r < 0) {
490 return r;
491 }
492
493 //first make sure the object is writable
494 r = check_writable(hctx);
495 if (r < 0) {
496 return r;
497 }
498
499 //check for existance of the key first
500 for (set<string>::const_iterator it = omap.begin();
501 it != omap.end(); ++it) {
502 bufferlist bl;
503 r = cls_cxx_map_get_val(hctx, *it, &bl);
504 if (r == -ENOENT || r == -ENODATA
505 || string(bl.c_str(), bl.length()) == ""){
506 return -ENODATA;
507 } else if (r < 0) {
508 CLS_LOG(20, "error reading omap val for %s: %d", it->c_str(), r);
509 return r;
510 }
511 }
512
513 //fail if removing from an object with only bound entries.
514 bufferlist old_size;
515 r = cls_cxx_getxattr(hctx, "size", &old_size);
516 if (r < 0) {
517 CLS_LOG(20, "error reading xattr %s: %d", "size", r);
518 return r;
519 }
520 int old_size_int = atoi(string(old_size.c_str(), old_size.length()).c_str());
521
522 CLS_LOG(20, "asserting size is greater than %d", bound);
523 if (old_size_int <= bound) {
524 return -EKEYREJECTED;
525 }
526
527 int new_size_int = old_size_int - omap.size();
528 CLS_LOG(20, "old size is %d, new size is %d", old_size_int, new_size_int);
529 bufferlist new_size;
530 stringstream s;
531 s << new_size_int;
532 new_size.append(s.str());
533
534 r = cls_cxx_setxattr(hctx, "size", &new_size);
535 if (r < 0) {
536 CLS_LOG(20, "error setting xattr %s: %d", "unwritable", r);
537 return r;
538 }
539
540 for (std::set<string>::const_iterator it = omap.begin();
541 it != omap.end(); ++it) {
542 r = cls_cxx_map_remove_key(hctx, *it);
543 if (r < 0) {
544 CLS_LOG(20, "error removing omap: %d", r);
545 return r;
546 }
547 }
548 return 0;
549 }
550
551 static int omap_remove_op(cls_method_context_t hctx,
552 bufferlist *in, bufferlist *out) {
553 CLS_LOG(20, "omap_remove");
554 omap_rm_args op;
555 bufferlist::iterator it = in->begin();
556 try {
557 ::decode(op, it);
558 } catch (buffer::error& err) {
559 return -EINVAL;
560 }
561 return omap_remove(hctx, op.omap, op.bound);
562 }
563
564 /**
565 * checks to see if this object needs to be split or rebalanced. if so, reads
566 * information about it.
567 *
568 * @post: if assert_size_in_bound(hctx, bound, comparator) succeeds,
569 * odata contains the size, omap, and unwritable attributes for this object.
570 * Otherwise, odata contains the size and unwritable attribute.
571 */
572 static int maybe_read_for_balance(cls_method_context_t hctx,
573 object_data &odata, int bound, int comparator) {
574 CLS_LOG(20, "rebalance reading");
575 //if unwritable, return
576 int r = check_writable(hctx);
577 if (r < 0) {
578 odata.unwritable = true;
579 CLS_LOG(20, "rebalance read: error getting xattr %s: %d", "unwritable", r);
580 return r;
581 } else {
582 odata.unwritable = false;
583 }
584
585 //get the size attribute
586 bufferlist size;
587 r = cls_cxx_getxattr(hctx, "size", &size);
588 if (r < 0) {
589 CLS_LOG(20, "rebalance read: error getting xattr %s: %d", "size", r);
590 return r;
591 }
592 odata.size = atoi(string(size.c_str(), size.length()).c_str());
593
594 //check if it needs to be balanced
595 r = assert_size_in_bound(hctx, bound, comparator);
596 if (r < 0) {
597 CLS_LOG(20, "rebalance read: error on asserting size: %d", r);
598 return -EBALANCE;
599 }
600
601 //if the assert succeeded, it needs to be balanced
602 r = cls_cxx_map_get_vals(hctx, "", "", LONG_MAX, &odata.omap);
603 if (r < 0){
604 CLS_LOG(20, "rebalance read: getting kvs failed with error %d", r);
605 return r;
606 }
607
608 CLS_LOG(20, "rebalance read: size xattr is %llu, omap size is %llu",
609 (unsigned long long)odata.size,
610 (unsigned long long)odata.omap.size());
611 return 0;
612 }
613
614 static int maybe_read_for_balance_op(cls_method_context_t hctx,
615 bufferlist *in, bufferlist *out) {
616 CLS_LOG(20, "maybe_read_for_balance");
617 rebalance_args op;
618 bufferlist::iterator it = in->begin();
619 try {
620 ::decode(op, it);
621 } catch (buffer::error& err) {
622 return -EINVAL;
623 }
624 int r = maybe_read_for_balance(hctx, op.odata, op.bound, op.comparator);
625 if (r < 0) {
626 return r;
627 } else {
628 op.encode(*out);
629 return 0;
630 }
631 }
632
633
634 CLS_INIT(kvs)
635 {
636 CLS_LOG(20, "Loaded assert condition class!");
637
638 cls_handle_t h_class;
639 cls_method_handle_t h_get_idata_from_key;
640 cls_method_handle_t h_get_next_idata;
641 cls_method_handle_t h_get_prev_idata;
642 cls_method_handle_t h_read_many;
643 cls_method_handle_t h_check_writable;
644 cls_method_handle_t h_assert_size_in_bound;
645 cls_method_handle_t h_omap_insert;
646 cls_method_handle_t h_create_with_omap;
647 cls_method_handle_t h_omap_remove;
648 cls_method_handle_t h_maybe_read_for_balance;
649
650 cls_register("kvs", &h_class);
651 cls_register_cxx_method(h_class, "get_idata_from_key",
652 CLS_METHOD_RD,
653 get_idata_from_key_op, &h_get_idata_from_key);
654 cls_register_cxx_method(h_class, "get_next_idata",
655 CLS_METHOD_RD,
656 get_next_idata_op, &h_get_next_idata);
657 cls_register_cxx_method(h_class, "get_prev_idata",
658 CLS_METHOD_RD,
659 get_prev_idata_op, &h_get_prev_idata);
660 cls_register_cxx_method(h_class, "read_many",
661 CLS_METHOD_RD,
662 read_many_op, &h_read_many);
663 cls_register_cxx_method(h_class, "check_writable",
664 CLS_METHOD_RD | CLS_METHOD_WR,
665 check_writable_op, &h_check_writable);
666 cls_register_cxx_method(h_class, "assert_size_in_bound",
667 CLS_METHOD_WR,
668 assert_size_in_bound_op, &h_assert_size_in_bound);
669 cls_register_cxx_method(h_class, "omap_insert",
670 CLS_METHOD_WR,
671 omap_insert_op, &h_omap_insert);
672 cls_register_cxx_method(h_class, "create_with_omap",
673 CLS_METHOD_WR,
674 create_with_omap_op, &h_create_with_omap);
675 cls_register_cxx_method(h_class, "omap_remove",
676 CLS_METHOD_WR,
677 omap_remove_op, &h_omap_remove);
678 cls_register_cxx_method(h_class, "maybe_read_for_balance",
679 CLS_METHOD_RD,
680 maybe_read_for_balance_op, &h_maybe_read_for_balance);
681
682 return;
683 }