]> git.proxmox.com Git - ceph.git/blob - ceph/src/key_value_store/cls_kvs.cc
update sources to v12.1.2
[ceph.git] / ceph / src / key_value_store / cls_kvs.cc
1 /*
2 * OSD classes for the key value store
3 *
4 * Created on: Aug 10, 2012
5 * Author: Eleanor Cawthon
6 */
7
8 #include "objclass/objclass.h"
9 #include <errno.h>
10 #include "key_value_store/kvs_arg_types.h"
11 #include "include/types.h"
12 #include <iostream>
13 #include <climits>
14
15
16 /**
17 * finds the index_data where a key belongs.
18 *
19 * @param key: the key to search for
20 * @param idata: the index_data for the first index value such that idata.key
21 * is greater than key.
22 * @param next_idata: the index_data for the next index entry after idata
23 * @pre: key is not encoded
24 * @post: idata contains complete information
25 * stored
26 */
27 static int get_idata_from_key(cls_method_context_t hctx, const string &key,
28 index_data &idata, index_data &next_idata) {
29 bufferlist raw_val;
30 int r = 0;
31 std::map<std::string, bufferlist> kvmap;
32
33 bool more;
34
35 r = cls_cxx_map_get_vals(hctx, key_data(key).encoded(), "", 2, &kvmap, &more);
36 if (r < 0) {
37 CLS_LOG(20, "error reading index for range %s: %d", key.c_str(), r);
38 return r;
39 }
40
41 r = cls_cxx_map_get_val(hctx, key_data(key).encoded(), &raw_val);
42 if (r == 0){
43 CLS_LOG(20, "%s is already in the index: %d", key.c_str(), r);
44 bufferlist::iterator b = raw_val.begin();
45 idata.decode(b);
46 if (!kvmap.empty()) {
47 bufferlist::iterator b = kvmap.begin()->second.begin();
48 next_idata.decode(b);
49 }
50 return r;
51 } else if (r == -ENOENT || r == -ENODATA) {
52 bufferlist::iterator b = kvmap.begin()->second.begin();
53 idata.decode(b);
54 if (idata.kdata.prefix != "1") {
55 bufferlist::iterator nb = (++kvmap.begin())->second.begin();
56 next_idata.decode(nb);
57 }
58 r = 0;
59 } else if (r < 0) {
60 CLS_LOG(20, "error reading index for duplicates %s: %d", key.c_str(), r);
61 return r;
62 }
63
64 CLS_LOG(20, "idata is %s", idata.str().c_str());
65 return r;
66 }
67
68
69 static int get_idata_from_key_op(cls_method_context_t hctx,
70 bufferlist *in, bufferlist *out) {
71 CLS_LOG(20, "get_idata_from_key_op");
72 idata_from_key_args op;
73 bufferlist::iterator it = in->begin();
74 try {
75 ::decode(op, it);
76 } catch (buffer::error& err) {
77 CLS_LOG(20, "error decoding idata_from_key_args.");
78 return -EINVAL;
79 }
80 int r = get_idata_from_key(hctx, op.key, op.idata, op.next_idata);
81 if (r < 0) {
82 return r;
83 } else {
84 ::encode(op, *out);
85 return 0;
86 }
87 }
88
89 /**
90 * finds the object in the index with the lowest key value that is greater
91 * than idata.key. If idata.key is the max key, returns -EOVERFLOW. If
92 * idata has a prefix and has timed out, cleans up.
93 *
94 * @param idata: idata for the object to search for.
95 * @param out_data: the idata for the next object.
96 *
97 * @pre: idata must contain a key.
98 * @post: out_data contains complete information
99 */
100 static int get_next_idata(cls_method_context_t hctx, const index_data &idata,
101 index_data &out_data) {
102 int r = 0;
103 std::map<std::string, bufferlist> kvs;
104 bool more;
105 r = cls_cxx_map_get_vals(hctx, idata.kdata.encoded(), "", 1, &kvs, &more);
106 if (r < 0){
107 CLS_LOG(20, "getting kvs failed with error %d", r);
108 return r;
109 }
110
111 if (!kvs.empty()) {
112 out_data.kdata.parse(kvs.begin()->first);
113 bufferlist::iterator b = kvs.begin()->second.begin();
114 out_data.decode(b);
115 } else {
116 r = -EOVERFLOW;
117 }
118
119 return r;
120 }
121
122 static int get_next_idata_op(cls_method_context_t hctx,
123 bufferlist *in, bufferlist *out) {
124 CLS_LOG(20, "get_next_idata_op");
125 idata_from_idata_args op;
126 bufferlist::iterator it = in->begin();
127 try {
128 ::decode(op, it);
129 } catch (buffer::error& err) {
130 return -EINVAL;
131 }
132 int r = get_next_idata(hctx, op.idata, op.next_idata);
133 if (r < 0) {
134 return r;
135 } else {
136 op.encode(*out);
137 return 0;
138 }
139 }
140
141 /**
142 * finds the object in the index with the highest key value that is less
143 * than idata.key. If idata.key is the lowest key, returns -ERANGE If
144 * idata has a prefix and has timed out, cleans up.
145 *
146 * @param idata: idata for the object to search for.
147 * @param out_data: the idata for the next object.
148 *
149 * @pre: idata must contain a key.
150 * @ost: out_data contains complete information
151 */
152 static int get_prev_idata(cls_method_context_t hctx, const index_data &idata,
153 index_data &out_data) {
154 int r = 0;
155 std::map<std::string, bufferlist> kvs;
156 bool more;
157 r = cls_cxx_map_get_vals(hctx, "", "", LONG_MAX, &kvs, &more);
158 if (r < 0){
159 CLS_LOG(20, "getting kvs failed with error %d", r);
160 return r;
161 }
162
163 std::map<std::string, bufferlist>::iterator it =
164 kvs.lower_bound(idata.kdata.encoded());
165 if (it->first != idata.kdata.encoded()) {
166 CLS_LOG(20, "object %s not found in the index (expected %s, found %s)",
167 idata.str().c_str(), idata.kdata.encoded().c_str(),
168 it->first.c_str());
169 return -ENODATA;
170 }
171 if (it == kvs.begin()) {
172 //it is the first object, there is no previous.
173 return -ERANGE;
174 } else {
175 --it;
176 }
177 out_data.kdata.parse(it->first);
178 bufferlist::iterator b = it->second.begin();
179 out_data.decode(b);
180
181 return 0;
182 }
183
184 static int get_prev_idata_op(cls_method_context_t hctx,
185 bufferlist *in, bufferlist *out) {
186 CLS_LOG(20, "get_next_idata_op");
187 idata_from_idata_args op;
188 bufferlist::iterator it = in->begin();
189 try {
190 ::decode(op, it);
191 } catch (buffer::error& err) {
192 return -EINVAL;
193 }
194 int r = get_prev_idata(hctx, op.idata, op.next_idata);
195 if (r < 0) {
196 return r;
197 } else {
198 op.encode(*out);
199 return 0;
200 }
201 }
202
203 /**
204 * Read all of the index entries where any keys in the map go
205 */
206 static int read_many(cls_method_context_t hctx, const set<string> &keys,
207 map<string, bufferlist> * out) {
208 int r = 0;
209 bool more;
210 CLS_ERR("reading from a map of size %d, first key encoded is %s",
211 (int)keys.size(), key_data(*keys.begin()).encoded().c_str());
212 r = cls_cxx_map_get_vals(hctx, key_data(*keys.begin()).encoded().c_str(),
213 "", LONG_MAX, out, &more);
214 if (r < 0) {
215 CLS_ERR("getting omap vals failed with error %d", r);
216 }
217
218 CLS_ERR("got map of size %d ", (int)out->size());
219 if (out->size() > 1) {
220 out->erase(out->upper_bound(key_data(*keys.rbegin()).encoded().c_str()),
221 out->end());
222 }
223 CLS_ERR("returning map of size %d", (int)out->size());
224 return r;
225 }
226
227 static int read_many_op(cls_method_context_t hctx, bufferlist *in,
228 bufferlist *out) {
229 CLS_LOG(20, "read_many_op");
230 set<string> op;
231 map<string, bufferlist> outmap;
232 bufferlist::iterator it = in->begin();
233 try {
234 ::decode(op, it);
235 } catch (buffer::error & err) {
236 return -EINVAL;
237 }
238 int r = read_many(hctx, op, &outmap);
239 if (r < 0) {
240 return r;
241 } else {
242 encode(outmap, *out);
243 return 0;
244 }
245 }
246
247 /**
248 * Checks the unwritable xattr. If it is "1" (i.e., it is unwritable), returns
249 * -EACCES. otherwise, returns 0.
250 */
251 static int check_writable(cls_method_context_t hctx) {
252 bufferlist bl;
253 int r = cls_cxx_getxattr(hctx, "unwritable", &bl);
254 if (r < 0) {
255 CLS_LOG(20, "error reading xattr %s: %d", "unwritable", r);
256 return r;
257 }
258 if (string(bl.c_str(), bl.length()) == "1") {
259 return -EACCES;
260 } else{
261 return 0;
262 }
263 }
264
265 static int check_writable_op(cls_method_context_t hctx,
266 bufferlist *in, bufferlist *out) {
267 CLS_LOG(20, "check_writable_op");
268 return check_writable(hctx);
269 }
270
271 /**
272 * returns -EKEYREJECTED if size is outside of bound, according to comparator.
273 *
274 * @bound: the limit to test
275 * @comparator: should be CEPH_OSD_CMPXATTR_OP_[EQ|GT|LT]
276 */
277 static int assert_size_in_bound(cls_method_context_t hctx, int bound,
278 int comparator) {
279 //determine size
280 bufferlist size_bl;
281 int r = cls_cxx_getxattr(hctx, "size", &size_bl);
282 if (r < 0) {
283 CLS_LOG(20, "error reading xattr %s: %d", "size", r);
284 return r;
285 }
286
287 int size = atoi(string(size_bl.c_str(), size_bl.length()).c_str());
288 CLS_LOG(20, "size is %d, bound is %d", size, bound);
289
290 //compare size to comparator
291 switch (comparator) {
292 case CEPH_OSD_CMPXATTR_OP_EQ:
293 if (size != bound) {
294 return -EKEYREJECTED;
295 }
296 break;
297 case CEPH_OSD_CMPXATTR_OP_LT:
298 if (size >= bound) {
299 return -EKEYREJECTED;
300 }
301 break;
302 case CEPH_OSD_CMPXATTR_OP_GT:
303 if (size <= bound) {
304 return -EKEYREJECTED;
305 }
306 break;
307 default:
308 CLS_LOG(20, "invalid argument passed to assert_size_in_bound: %d",
309 comparator);
310 return -EINVAL;
311 }
312 return 0;
313 }
314
315 static int assert_size_in_bound_op(cls_method_context_t hctx,
316 bufferlist *in, bufferlist *out) {
317 CLS_LOG(20, "assert_size_in_bound_op");
318 assert_size_args op;
319 bufferlist::iterator it = in->begin();
320 try {
321 ::decode(op, it);
322 } catch (buffer::error& err) {
323 return -EINVAL;
324 }
325 return assert_size_in_bound(hctx, op.bound, op.comparator);
326 }
327
328 /**
329 * Attempts to insert omap into this object's omap.
330 *
331 * @return:
332 * if unwritable, returns -EACCES.
333 * if size > bound and key doesn't already exist in the omap, returns -EBALANCE.
334 * if exclusive is true, returns -EEXIST if any keys already exist.
335 *
336 * @post: object has omap entries inserted, and size xattr is updated
337 */
338 static int omap_insert(cls_method_context_t hctx,
339 const map<string, bufferlist> &omap, int bound, bool exclusive) {
340
341 uint64_t size;
342 time_t time;
343 int r = cls_cxx_stat(hctx, &size, &time);
344 if (r < 0) {
345 return r;
346 }
347 CLS_LOG(20, "inserting %s", omap.begin()->first.c_str());
348 r = check_writable(hctx);
349 if (r < 0) {
350 CLS_LOG(20, "omap_insert: this object is unwritable: %d", r);
351 return r;
352 }
353
354 int assert_bound = bound;
355
356 //if this is an exclusive insert, make sure the key doesn't already exist.
357 for (map<string, bufferlist>::const_iterator it = omap.begin();
358 it != omap.end(); ++it) {
359 bufferlist bl;
360 r = cls_cxx_map_get_val(hctx, it->first, &bl);
361 if (r == 0 && string(bl.c_str(), bl.length()) != ""){
362 if (exclusive) {
363 CLS_LOG(20, "error: this is an exclusive insert and %s exists.",
364 it->first.c_str());
365 return -EEXIST;
366 }
367 assert_bound++;
368 CLS_LOG(20, "increased assert_bound to %d", assert_bound);
369 } else if (r != -ENODATA && r != -ENOENT) {
370 CLS_LOG(20, "error reading omap val for %s: %d", it->first.c_str(), r);
371 return r;
372 }
373 }
374
375 bufferlist old_size;
376 r = cls_cxx_getxattr(hctx, "size", &old_size);
377 if (r < 0) {
378 CLS_LOG(20, "error reading xattr %s: %d", "size", r);
379 return r;
380 }
381
382 int old_size_int = atoi(string(old_size.c_str(), old_size.length()).c_str());
383
384 CLS_LOG(20, "asserting size is less than %d (bound is %d)", assert_bound, bound);
385 if (old_size_int >= assert_bound) {
386 return -EKEYREJECTED;
387 }
388
389 int new_size_int = old_size_int + omap.size() - (assert_bound - bound);
390 CLS_LOG(20, "old size is %d, new size is %d", old_size_int, new_size_int);
391 bufferlist new_size;
392 stringstream s;
393 s << new_size_int;
394 new_size.append(s.str());
395
396 r = cls_cxx_map_set_vals(hctx, &omap);
397 if (r < 0) {
398 CLS_LOG(20, "error setting omap: %d", r);
399 return r;
400 }
401
402 r = cls_cxx_setxattr(hctx, "size", &new_size);
403 if (r < 0) {
404 CLS_LOG(20, "error setting xattr %s: %d", "size", r);
405 return r;
406 }
407 CLS_LOG(20, "successfully inserted %s", omap.begin()->first.c_str());
408 return 0;
409 }
410
411 static int omap_insert_op(cls_method_context_t hctx,
412 bufferlist *in, bufferlist *out) {
413 CLS_LOG(20, "omap_insert");
414 omap_set_args op;
415 bufferlist::iterator it = in->begin();
416 try {
417 ::decode(op, it);
418 } catch (buffer::error& err) {
419 return -EINVAL;
420 }
421 return omap_insert(hctx, op.omap, op.bound, op.exclusive);
422 }
423
424 static int create_with_omap(cls_method_context_t hctx,
425 const map<string, bufferlist> &omap) {
426 CLS_LOG(20, "creating with omap: %s", omap.begin()->first.c_str());
427 //first make sure the object is writable
428 int r = cls_cxx_create(hctx, true);
429 if (r < 0) {
430 CLS_LOG(20, "omap create: creating failed: %d", r);
431 return r;
432 }
433
434 int new_size_int = omap.size();
435 CLS_LOG(20, "omap insert: new size is %d", new_size_int);
436 bufferlist new_size;
437 stringstream s;
438 s << new_size_int;
439 new_size.append(s.str());
440
441 r = cls_cxx_map_set_vals(hctx, &omap);
442 if (r < 0) {
443 CLS_LOG(20, "omap create: error setting omap: %d", r);
444 return r;
445 }
446
447 r = cls_cxx_setxattr(hctx, "size", &new_size);
448 if (r < 0) {
449 CLS_LOG(20, "omap create: error setting xattr %s: %d", "size", r);
450 return r;
451 }
452
453 bufferlist u;
454 u.append("0");
455 r = cls_cxx_setxattr(hctx, "unwritable", &u);
456 if (r < 0) {
457 CLS_LOG(20, "omap create: error setting xattr %s: %d", "unwritable", r);
458 return r;
459 }
460
461 CLS_LOG(20, "successfully created %s", omap.begin()->first.c_str());
462 return 0;
463 }
464
465 static int create_with_omap_op(cls_method_context_t hctx,
466 bufferlist *in, bufferlist *out) {
467 CLS_LOG(20, "omap_insert");
468 map<string, bufferlist> omap;
469 bufferlist::iterator it = in->begin();
470 try {
471 ::decode(omap, it);
472 } catch (buffer::error& err) {
473 return -EINVAL;
474 }
475 return create_with_omap(hctx, omap);
476 }
477
478 /**
479 * Attempts to remove omap from this object's omap.
480 *
481 * @return:
482 * if unwritable, returns -EACCES.
483 * if size < bound and key doesn't already exist in the omap, returns -EBALANCE.
484 * if any of the keys are not in this object, returns -ENODATA.
485 *
486 * @post: object has omap entries removed, and size xattr is updated
487 */
488 static int omap_remove(cls_method_context_t hctx,
489 const std::set<string> &omap, int bound) {
490 int r;
491 uint64_t size;
492 time_t time;
493 r = cls_cxx_stat(hctx, &size, &time);
494 if (r < 0) {
495 return r;
496 }
497
498 //first make sure the object is writable
499 r = check_writable(hctx);
500 if (r < 0) {
501 return r;
502 }
503
504 //check for existance of the key first
505 for (set<string>::const_iterator it = omap.begin();
506 it != omap.end(); ++it) {
507 bufferlist bl;
508 r = cls_cxx_map_get_val(hctx, *it, &bl);
509 if (r == -ENOENT || r == -ENODATA
510 || string(bl.c_str(), bl.length()) == ""){
511 return -ENODATA;
512 } else if (r < 0) {
513 CLS_LOG(20, "error reading omap val for %s: %d", it->c_str(), r);
514 return r;
515 }
516 }
517
518 //fail if removing from an object with only bound entries.
519 bufferlist old_size;
520 r = cls_cxx_getxattr(hctx, "size", &old_size);
521 if (r < 0) {
522 CLS_LOG(20, "error reading xattr %s: %d", "size", r);
523 return r;
524 }
525 int old_size_int = atoi(string(old_size.c_str(), old_size.length()).c_str());
526
527 CLS_LOG(20, "asserting size is greater than %d", bound);
528 if (old_size_int <= bound) {
529 return -EKEYREJECTED;
530 }
531
532 int new_size_int = old_size_int - omap.size();
533 CLS_LOG(20, "old size is %d, new size is %d", old_size_int, new_size_int);
534 bufferlist new_size;
535 stringstream s;
536 s << new_size_int;
537 new_size.append(s.str());
538
539 r = cls_cxx_setxattr(hctx, "size", &new_size);
540 if (r < 0) {
541 CLS_LOG(20, "error setting xattr %s: %d", "unwritable", r);
542 return r;
543 }
544
545 for (std::set<string>::const_iterator it = omap.begin();
546 it != omap.end(); ++it) {
547 r = cls_cxx_map_remove_key(hctx, *it);
548 if (r < 0) {
549 CLS_LOG(20, "error removing omap: %d", r);
550 return r;
551 }
552 }
553 return 0;
554 }
555
556 static int omap_remove_op(cls_method_context_t hctx,
557 bufferlist *in, bufferlist *out) {
558 CLS_LOG(20, "omap_remove");
559 omap_rm_args op;
560 bufferlist::iterator it = in->begin();
561 try {
562 ::decode(op, it);
563 } catch (buffer::error& err) {
564 return -EINVAL;
565 }
566 return omap_remove(hctx, op.omap, op.bound);
567 }
568
569 /**
570 * checks to see if this object needs to be split or rebalanced. if so, reads
571 * information about it.
572 *
573 * @post: if assert_size_in_bound(hctx, bound, comparator) succeeds,
574 * odata contains the size, omap, and unwritable attributes for this object.
575 * Otherwise, odata contains the size and unwritable attribute.
576 */
577 static int maybe_read_for_balance(cls_method_context_t hctx,
578 object_data &odata, int bound, int comparator) {
579 CLS_LOG(20, "rebalance reading");
580 //if unwritable, return
581 int r = check_writable(hctx);
582 if (r < 0) {
583 odata.unwritable = true;
584 CLS_LOG(20, "rebalance read: error getting xattr %s: %d", "unwritable", r);
585 return r;
586 } else {
587 odata.unwritable = false;
588 }
589
590 //get the size attribute
591 bufferlist size;
592 r = cls_cxx_getxattr(hctx, "size", &size);
593 if (r < 0) {
594 CLS_LOG(20, "rebalance read: error getting xattr %s: %d", "size", r);
595 return r;
596 }
597 odata.size = atoi(string(size.c_str(), size.length()).c_str());
598
599 //check if it needs to be balanced
600 r = assert_size_in_bound(hctx, bound, comparator);
601 if (r < 0) {
602 CLS_LOG(20, "rebalance read: error on asserting size: %d", r);
603 return -EBALANCE;
604 }
605
606 //if the assert succeeded, it needs to be balanced
607 bool more;
608 r = cls_cxx_map_get_vals(hctx, "", "", LONG_MAX, &odata.omap, &more);
609 if (r < 0){
610 CLS_LOG(20, "rebalance read: getting kvs failed with error %d", r);
611 return r;
612 }
613
614 CLS_LOG(20, "rebalance read: size xattr is %llu, omap size is %llu",
615 (unsigned long long)odata.size,
616 (unsigned long long)odata.omap.size());
617 return 0;
618 }
619
620 static int maybe_read_for_balance_op(cls_method_context_t hctx,
621 bufferlist *in, bufferlist *out) {
622 CLS_LOG(20, "maybe_read_for_balance");
623 rebalance_args op;
624 bufferlist::iterator it = in->begin();
625 try {
626 ::decode(op, it);
627 } catch (buffer::error& err) {
628 return -EINVAL;
629 }
630 int r = maybe_read_for_balance(hctx, op.odata, op.bound, op.comparator);
631 if (r < 0) {
632 return r;
633 } else {
634 op.encode(*out);
635 return 0;
636 }
637 }
638
639
640 CLS_INIT(kvs)
641 {
642 CLS_LOG(20, "Loaded assert condition class!");
643
644 cls_handle_t h_class;
645 cls_method_handle_t h_get_idata_from_key;
646 cls_method_handle_t h_get_next_idata;
647 cls_method_handle_t h_get_prev_idata;
648 cls_method_handle_t h_read_many;
649 cls_method_handle_t h_check_writable;
650 cls_method_handle_t h_assert_size_in_bound;
651 cls_method_handle_t h_omap_insert;
652 cls_method_handle_t h_create_with_omap;
653 cls_method_handle_t h_omap_remove;
654 cls_method_handle_t h_maybe_read_for_balance;
655
656 cls_register("kvs", &h_class);
657 cls_register_cxx_method(h_class, "get_idata_from_key",
658 CLS_METHOD_RD,
659 get_idata_from_key_op, &h_get_idata_from_key);
660 cls_register_cxx_method(h_class, "get_next_idata",
661 CLS_METHOD_RD,
662 get_next_idata_op, &h_get_next_idata);
663 cls_register_cxx_method(h_class, "get_prev_idata",
664 CLS_METHOD_RD,
665 get_prev_idata_op, &h_get_prev_idata);
666 cls_register_cxx_method(h_class, "read_many",
667 CLS_METHOD_RD,
668 read_many_op, &h_read_many);
669 cls_register_cxx_method(h_class, "check_writable",
670 CLS_METHOD_RD | CLS_METHOD_WR,
671 check_writable_op, &h_check_writable);
672 cls_register_cxx_method(h_class, "assert_size_in_bound",
673 CLS_METHOD_WR,
674 assert_size_in_bound_op, &h_assert_size_in_bound);
675 cls_register_cxx_method(h_class, "omap_insert",
676 CLS_METHOD_WR,
677 omap_insert_op, &h_omap_insert);
678 cls_register_cxx_method(h_class, "create_with_omap",
679 CLS_METHOD_WR,
680 create_with_omap_op, &h_create_with_omap);
681 cls_register_cxx_method(h_class, "omap_remove",
682 CLS_METHOD_WR,
683 omap_remove_op, &h_omap_remove);
684 cls_register_cxx_method(h_class, "maybe_read_for_balance",
685 CLS_METHOD_RD,
686 maybe_read_for_balance_op, &h_maybe_read_for_balance);
687
688 return;
689 }