2 * Key-value store using librados
6 * eleanor.cawthon@inktank.com
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "include/compat.h"
15 #include "key_value_store/key_value_structure.h"
16 #include "key_value_store/kv_flat_btree_async.h"
17 #include "key_value_store/kvs_arg_types.h"
18 #include "include/rados/librados.hpp"
19 #include "common/ceph_context.h"
20 #include "common/Clock.h"
21 #include "include/types.h"
33 using ceph::bufferlist
;
35 bool index_data::is_timed_out(utime_t now
, utime_t timeout
) const {
36 return prefix
!= "" && now
- ts
> timeout
;
39 void IndexCache::clear() {
44 void IndexCache::push(const string
&key
, const index_data
&idata
) {
45 if (cache_size
== 0) {
49 map
<key_data
, pair
<index_data
, utime_t
> >::iterator old_it
=
50 k2itmap
.lower_bound(key_data(key
));
51 if (old_it
!= k2itmap
.end()) {
52 t2kmap
.erase(old_it
->second
.second
);
53 k2itmap
.erase(old_it
);
55 map
<key_data
, pair
<index_data
, utime_t
> >::iterator new_it
=
56 k2itmap
.find(idata
.kdata
);
57 if (new_it
!= k2itmap
.end()) {
58 utime_t old_time
= new_it
->second
.second
;
59 t2kmap
.erase(old_time
);
61 utime_t time
= ceph_clock_now();
62 k2itmap
[idata
.kdata
] = make_pair(idata
, time
);
63 t2kmap
[time
] = idata
.kdata
;
64 if ((int)k2itmap
.size() > cache_size
) {
70 void IndexCache::push(const index_data
&idata
) {
71 if (cache_size
== 0) {
74 if (k2itmap
.count(idata
.kdata
) > 0) {
75 utime_t old_time
= k2itmap
[idata
.kdata
].second
;
76 t2kmap
.erase(old_time
);
77 k2itmap
.erase(idata
.kdata
);
79 utime_t time
= ceph_clock_now();
80 k2itmap
[idata
.kdata
] = make_pair(idata
, time
);
81 t2kmap
[time
] = idata
.kdata
;
82 if ((int)k2itmap
.size() > cache_size
) {
87 void IndexCache::pop() {
88 if (cache_size
== 0) {
91 map
<utime_t
, key_data
>::iterator it
= t2kmap
.begin();
92 utime_t time
= it
->first
;
93 key_data kdata
= it
->second
;
98 void IndexCache::erase(key_data kdata
) {
99 if (cache_size
== 0) {
102 if (k2itmap
.count(kdata
) > 0) {
103 utime_t c
= k2itmap
[kdata
].second
;
104 k2itmap
.erase(kdata
);
109 int IndexCache::get(const string
&key
, index_data
*idata
) const {
110 if (cache_size
== 0) {
113 if ((int)k2itmap
.size() == 0) {
116 map
<key_data
, pair
<index_data
, utime_t
> >::const_iterator it
=
117 k2itmap
.lower_bound(key_data(key
));
118 if (it
== k2itmap
.end() || !(it
->second
.first
.min_kdata
< key_data(key
))) {
121 *idata
= it
->second
.first
;
126 int IndexCache::get(const string
&key
, index_data
*idata
,
127 index_data
*next_idata
) const {
128 if (cache_size
== 0) {
131 map
<key_data
, pair
<index_data
, utime_t
> >::const_iterator it
=
132 k2itmap
.lower_bound(key_data(key
));
133 if (it
== k2itmap
.end() || ++it
== k2itmap
.end()) {
137 if (!(it
->second
.first
.min_kdata
< key_data(key
))){
138 //stale, should be reread.
141 *idata
= it
->second
.first
;
143 if (it
!= k2itmap
.end()) {
144 *next_idata
= it
->second
.first
;
151 int KvFlatBtreeAsync::nothing() {
155 int KvFlatBtreeAsync::wait() {
156 if (rand() % 10 == 0) {
162 int KvFlatBtreeAsync::suicide() {
163 if (rand() % 10 == 0) {
164 if (verbose
) cout
<< client_name
<< " is suiciding" << std::endl
;
170 int KvFlatBtreeAsync::next(const index_data
&idata
, index_data
* out_data
)
172 if (verbose
) cout
<< "\t\t" << client_name
<< "-next: finding next of "
176 librados::ObjectReadOperation oro
;
177 std::map
<std::string
, bufferlist
> kvs
;
178 oro
.omap_get_vals2(idata
.kdata
.encoded(),1,&kvs
, nullptr, &err
);
179 err
= io_ctx
.operate(index_name
, &oro
, NULL
);
181 if (verbose
) cout
<< "\t\t\t" << client_name
182 << "-next: getting index failed with error "
187 out_data
->kdata
.parse(kvs
.begin()->first
);
188 auto b
= kvs
.begin()->second
.cbegin();
190 if (idata
.is_timed_out(ceph_clock_now(), timeout
)) {
191 if (verbose
) cout
<< client_name
<< " THINKS THE OTHER CLIENT DIED."
193 //the client died after deleting the object. clean up.
202 int KvFlatBtreeAsync::prev(const index_data
&idata
, index_data
* out_data
)
204 if (verbose
) cout
<< "\t\t" << client_name
<< "-prev: finding prev of "
205 << idata
.str() << std::endl
;
208 idata_from_idata_args in_args
;
209 in_args
.idata
= idata
;
210 in_args
.encode(inbl
);
212 err
= io_ctx
.exec(index_name
,"kvs", "get_prev_idata", inbl
, outbl
);
214 if (verbose
) cout
<< "\t\t\t" << client_name
215 << "-prev: getting index failed with error "
217 if (idata
.is_timed_out(ceph_clock_now(), timeout
)) {
218 if (verbose
) cout
<< client_name
<< " THINKS THE OTHER CLIENT DIED."
220 //the client died after deleting the object. clean up.
221 err
= cleanup(idata
, err
);
222 if (err
== -ESUICIDE
) {
230 auto it
= outbl
.cbegin();
232 *out_data
= in_args
.next_idata
;
233 if (verbose
) cout
<< "\t\t" << client_name
<< "-prev: prev is "
239 int KvFlatBtreeAsync::read_index(const string
&key
, index_data
* idata
,
240 index_data
* next_idata
, bool force_update
) {
243 if (verbose
) cout
<< "\t" << client_name
244 << "-read_index: getting index_data for " << key
245 << " from cache" << std::endl
;
247 if (next_idata
!= NULL
) {
248 err
= icache
.get(key
, idata
, next_idata
);
250 err
= icache
.get(key
, idata
);
252 icache_lock
.Unlock();
255 //if (verbose) cout << "CACHE SUCCESS" << std::endl;
258 if (verbose
) cout
<< "NOT IN CACHE" << std::endl
;
262 if (verbose
) cout
<< "\t" << client_name
263 << "-read_index: getting index_data for " << key
264 << " from object" << std::endl
;
265 librados::ObjectReadOperation oro
;
267 std::set
<std::string
> key_set
;
268 key_set
.insert(key_data(key
).encoded());
269 std::map
<std::string
, bufferlist
> kvmap
;
270 std::map
<std::string
, bufferlist
> dupmap
;
271 oro
.omap_get_vals_by_keys(key_set
, &dupmap
, &err
);
272 oro
.omap_get_vals2(key_data(key
).encoded(),
273 (cache_size
/ cache_refresh
>= 2? cache_size
/ cache_refresh
: 2),
274 &kvmap
, nullptr, &err
);
275 err
= io_ctx
.operate(index_name
, &oro
, NULL
);
276 utime_t mytime
= ceph_clock_now();
278 cerr
<< "\t" << client_name
279 << "-read_index: getting keys failed with "
281 ceph_abort_msg(client_name
+ "-read_index: reading index failed");
284 kvmap
.insert(dupmap
.begin(), dupmap
.end());
285 for (map
<string
, bufferlist
>::iterator it
= ++kvmap
.begin();
288 bufferlist bl
= it
->second
;
289 auto blit
= bl
.cbegin();
290 index_data this_idata
;
291 this_idata
.decode(blit
);
292 if (this_idata
.is_timed_out(mytime
, timeout
)) {
293 if (verbose
) cout
<< client_name
294 << " THINKS THE OTHER CLIENT DIED. (mytime is "
295 << mytime
.sec() << "." << mytime
.usec() << ", idata.ts is "
296 << this_idata
.ts
.sec() << "." << this_idata
.ts
.usec()
297 << ", it has been " << (mytime
- this_idata
.ts
).sec()
298 << '.' << (mytime
- this_idata
.ts
).usec()
299 << ", timeout is " << timeout
<< ")" << std::endl
;
300 //the client died after deleting the object. clean up.
301 if (cleanup(this_idata
, -EPREFIX
) == -ESUICIDE
) {
304 return read_index(key
, idata
, next_idata
, force_update
);
307 icache
.push(this_idata
);
308 icache_lock
.Unlock();
310 auto b
= kvmap
.begin()->second
.cbegin();
312 idata
->kdata
.parse(kvmap
.begin()->first
);
313 if (verbose
) cout
<< "\t" << client_name
<< "-read_index: kvmap_size is "
315 << ", idata is " << idata
->str() << std::endl
;
317 ceph_assert(idata
->obj
!= "");
319 icache
.push(key
, *idata
);
320 icache_lock
.Unlock();
322 if (next_idata
!= NULL
&& idata
->kdata
.prefix
!= "1") {
323 next_idata
->kdata
.parse((++kvmap
.begin())->first
);
324 auto nb
= (++kvmap
.begin())->second
.cbegin();
325 next_idata
->decode(nb
);
327 icache
.push(*next_idata
);
328 icache_lock
.Unlock();
333 int KvFlatBtreeAsync::split(const index_data
&idata
) {
337 if (idata
.prefix
!= "") {
342 args
.bound
= 2 * k
- 1;
343 args
.comparator
= CEPH_OSD_CMPXATTR_OP_GT
;
344 err
= read_object(idata
.obj
, &args
);
345 args
.odata
.max_kdata
= idata
.kdata
;
347 if (verbose
) cout
<< "\t\t" << client_name
<< "-split: read object "
349 << " got " << err
<< std::endl
;
353 if (verbose
) cout
<< "\t\t" << client_name
<< "-split: splitting "
355 << ", which has size " << args
.odata
.size
356 << " and actual size " << args
.odata
.omap
.size() << std::endl
;
358 ///////preparations that happen outside the critical section
360 vector
<object_data
> to_create
;
361 vector
<object_data
> to_delete
;
362 to_delete
.push_back(object_data(idata
.min_kdata
,
363 args
.odata
.max_kdata
, args
.odata
.name
, args
.odata
.version
));
365 //for lower half object
366 map
<std::string
, bufferlist
>::const_iterator it
= args
.odata
.omap
.begin();
367 client_index_lock
.Lock();
368 to_create
.push_back(object_data(to_string(client_name
, client_index
++)));
369 client_index_lock
.Unlock();
370 for (int i
= 0; i
< k
; i
++) {
371 to_create
[0].omap
.insert(*it
);
374 to_create
[0].min_kdata
= idata
.min_kdata
;
375 to_create
[0].max_kdata
= key_data(to_create
[0].omap
.rbegin()->first
);
377 //for upper half object
378 client_index_lock
.Lock();
379 to_create
.push_back(object_data(to_create
[0].max_kdata
,
380 args
.odata
.max_kdata
,
381 to_string(client_name
, client_index
++)));
382 client_index_lock
.Unlock();
383 to_create
[1].omap
.insert(
384 ++args
.odata
.omap
.find(to_create
[0].omap
.rbegin()->first
),
385 args
.odata
.omap
.end());
387 //setting up operations
388 librados::ObjectWriteOperation owos
[6];
389 vector
<pair
<pair
<int, string
>, librados::ObjectWriteOperation
*> > ops
;
391 set_up_prefix_index(to_create
, to_delete
, &owos
[0], &out_data
, &err
);
392 ops
.push_back(make_pair(
393 pair
<int, string
>(ADD_PREFIX
, index_name
),
395 for (int i
= 1; i
< 6; i
++) {
396 ops
.push_back(make_pair(make_pair(0,""), &owos
[i
]));
398 set_up_ops(to_create
, to_delete
, &ops
, out_data
, &err
);
400 /////BEGIN CRITICAL SECTION/////
401 //put prefix on index entry for idata.val
402 err
= perform_ops("\t\t" + client_name
+ "-split:", out_data
, &ops
);
406 if (verbose
) cout
<< "\t\t" << client_name
<< "-split: done splitting."
408 /////END CRITICAL SECTION/////
410 for (vector
<delete_data
>::iterator it
= out_data
.to_delete
.begin();
411 it
!= out_data
.to_delete
.end(); ++it
) {
412 icache
.erase(it
->max
);
414 for (vector
<create_data
>::iterator it
= out_data
.to_create
.begin();
415 it
!= out_data
.to_create
.end(); ++it
) {
416 icache
.push(index_data(*it
));
418 icache_lock
.Unlock();
422 int KvFlatBtreeAsync::rebalance(const index_data
&idata1
,
423 const index_data
&next_idata
){
427 if (idata1
.prefix
!= "") {
431 rebalance_args args1
;
433 args1
.comparator
= CEPH_OSD_CMPXATTR_OP_LT
;
434 index_data idata2
= next_idata
;
436 rebalance_args args2
;
438 args2
.comparator
= CEPH_OSD_CMPXATTR_OP_LT
;
440 if (idata1
.kdata
.prefix
== "1") {
441 //this is the highest key in the index, so it doesn't have a next.
443 //read the index for the previous entry
444 err
= prev(idata1
, &idata2
);
445 if (err
== -ERANGE
) {
446 if (verbose
) cout
<< "\t\t" << client_name
447 << "-rebalance: this is the only node, "
448 << "so aborting" << std::endl
;
450 } else if (err
< 0) {
454 //read the first object
455 err
= read_object(idata1
.obj
, &args2
);
457 if (verbose
) cout
<< "reading " << idata1
.obj
<< " failed with " << err
459 if (err
== -ENOENT
) {
464 args2
.odata
.min_kdata
= idata1
.min_kdata
;
465 args2
.odata
.max_kdata
= idata1
.kdata
;
467 //read the second object
468 args1
.bound
= 2 * k
+ 1;
469 err
= read_object(idata2
.obj
, &args1
);
471 if (verbose
) cout
<< "reading " << idata1
.obj
<< " failed with " << err
475 args1
.odata
.min_kdata
= idata2
.min_kdata
;
476 args1
.odata
.max_kdata
= idata2
.kdata
;
478 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: read "
480 << ". size: " << args1
.odata
.size
<< " version: "
481 << args1
.odata
.version
484 assert (next_idata
.obj
!= "");
485 //there is a next key, so get it.
486 err
= read_object(idata1
.obj
, &args1
);
488 if (verbose
) cout
<< "reading " << idata1
.obj
<< " failed with " << err
492 args1
.odata
.min_kdata
= idata1
.min_kdata
;
493 args1
.odata
.max_kdata
= idata1
.kdata
;
495 args2
.bound
= 2 * k
+ 1;
496 err
= read_object(idata2
.obj
, &args2
);
498 if (verbose
) cout
<< "reading " << idata1
.obj
<< " failed with " << err
500 if (err
== -ENOENT
) {
505 args2
.odata
.min_kdata
= idata2
.min_kdata
;
506 args2
.odata
.max_kdata
= idata2
.kdata
;
508 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: read "
510 << ". size: " << args2
.odata
.size
<< " version: "
511 << args2
.odata
.version
515 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: o1 is "
516 << args1
.odata
.max_kdata
.encoded() << ","
517 << args1
.odata
.name
<< " with size " << args1
.odata
.size
518 << " , o2 is " << args2
.odata
.max_kdata
.encoded()
519 << "," << args2
.odata
.name
<< " with size " << args2
.odata
.size
523 if ((int)args1
.odata
.size
> k
&& (int)args1
.odata
.size
<= 2*k
524 && (int)args2
.odata
.size
> k
525 && (int)args2
.odata
.size
<= 2*k
) {
527 if (verbose
) cout
<< "\t\t" << client_name
528 << "-rebalance: both sizes in range, so"
529 << " aborting " << std::endl
;
531 } else if (idata1
.prefix
!= "" || idata2
.prefix
!= "") {
535 //this is the high object. it gets created regardless of rebalance or merge.
536 client_index_lock
.Lock();
537 string o2w
= to_string(client_name
, client_index
++);
538 client_index_lock
.Unlock();
540 vector
<object_data
> to_create
;
541 vector
<object_data
> to_delete
;
542 librados::ObjectWriteOperation create
[2];//possibly only 1 will be used
543 librados::ObjectWriteOperation other_ops
[6];
544 vector
<pair
<pair
<int, string
>, librados::ObjectWriteOperation
*> > ops
;
545 ops
.push_back(make_pair(
546 pair
<int, string
>(ADD_PREFIX
, index_name
),
549 if ((int)args1
.odata
.size
+ (int)args2
.odata
.size
<= 2*k
) {
551 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: merging "
553 << " and " << args2
.odata
.name
<< " to get " << o2w
555 map
<string
, bufferlist
> write2_map
;
556 write2_map
.insert(args1
.odata
.omap
.begin(), args1
.odata
.omap
.end());
557 write2_map
.insert(args2
.odata
.omap
.begin(), args2
.odata
.omap
.end());
558 to_create
.push_back(object_data(args1
.odata
.min_kdata
,
559 args2
.odata
.max_kdata
, o2w
, write2_map
));
560 ops
.push_back(make_pair(
561 pair
<int, string
>(MAKE_OBJECT
, o2w
),
563 ceph_assert((int)write2_map
.size() <= 2*k
);
566 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: rebalancing "
568 << " and " << args2
.odata
.name
<< std::endl
;
569 map
<std::string
, bufferlist
> write1_map
;
570 map
<std::string
, bufferlist
> write2_map
;
571 map
<std::string
, bufferlist
>::iterator it
;
572 client_index_lock
.Lock();
573 string o1w
= to_string(client_name
, client_index
++);
574 client_index_lock
.Unlock();
575 int target_size_1
= ceil(((int)args1
.odata
.size
+ (int)args2
.odata
.size
)
577 if (args1
.odata
.max_kdata
!= idata1
.kdata
) {
578 //this should be true if idata1 is the high object
579 target_size_1
= floor(((int)args1
.odata
.size
+ (int)args2
.odata
.size
)
582 for (it
= args1
.odata
.omap
.begin();
583 it
!= args1
.odata
.omap
.end() && (int)write1_map
.size()
586 write1_map
.insert(*it
);
588 if (it
!= args1
.odata
.omap
.end()){
589 //write1_map is full, so put the rest in write2_map
590 write2_map
.insert(it
, args1
.odata
.omap
.end());
591 write2_map
.insert(args2
.odata
.omap
.begin(), args2
.odata
.omap
.end());
593 //args1.odata.omap was small, and write2_map still needs more
594 map
<std::string
, bufferlist
>::iterator it2
;
595 for(it2
= args2
.odata
.omap
.begin();
596 (it2
!= args2
.odata
.omap
.end()) && ((int)write1_map
.size()
599 write1_map
.insert(*it2
);
601 write2_map
.insert(it2
, args2
.odata
.omap
.end());
603 if (verbose
) cout
<< "\t\t" << client_name
604 << "-rebalance: write1_map has size "
605 << write1_map
.size() << ", write2_map.size() is " << write2_map
.size()
607 //at this point, write1_map and write2_map should have the correct pairs
608 to_create
.push_back(object_data(args1
.odata
.min_kdata
,
609 key_data(write1_map
.rbegin()->first
),
611 to_create
.push_back(object_data( key_data(write1_map
.rbegin()->first
),
612 args2
.odata
.max_kdata
, o2w
, write2_map
));
613 ops
.push_back(make_pair(
614 pair
<int, string
>(MAKE_OBJECT
, o1w
),
616 ops
.push_back(make_pair(
617 pair
<int, string
>(MAKE_OBJECT
, o2w
),
621 to_delete
.push_back(object_data(args1
.odata
.min_kdata
,
622 args1
.odata
.max_kdata
, args1
.odata
.name
, args1
.odata
.version
));
623 to_delete
.push_back(object_data(args2
.odata
.min_kdata
,
624 args2
.odata
.max_kdata
, args2
.odata
.name
, args2
.odata
.version
));
625 for (int i
= 1; i
< 6; i
++) {
626 ops
.push_back(make_pair(make_pair(0,""), &other_ops
[i
]));
630 set_up_prefix_index(to_create
, to_delete
, &other_ops
[0], &out_data
, &err
);
631 set_up_ops(to_create
, to_delete
, &ops
, out_data
, &err
);
633 //at this point, all operations should be completely set up.
634 /////BEGIN CRITICAL SECTION/////
635 err
= perform_ops("\t\t" + client_name
+ "-rebalance:", out_data
, &ops
);
640 for (vector
<delete_data
>::iterator it
= out_data
.to_delete
.begin();
641 it
!= out_data
.to_delete
.end(); ++it
) {
642 icache
.erase(it
->max
);
644 for (vector
<create_data
>::iterator it
= out_data
.to_create
.begin();
645 it
!= out_data
.to_create
.end(); ++it
) {
646 icache
.push(index_data(*it
));
648 icache_lock
.Unlock();
649 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: done rebalancing."
651 /////END CRITICAL SECTION/////
655 int KvFlatBtreeAsync::read_object(const string
&obj
, object_data
* odata
) {
656 librados::ObjectReadOperation get_obj
;
657 librados::AioCompletion
* obj_aioc
= rados
.aio_create_completion();
661 get_obj
.omap_get_vals2("", LONG_MAX
, &odata
->omap
, nullptr, &err
);
662 get_obj
.getxattr("unwritable", &unw_bl
, &err
);
663 io_ctx
.aio_operate(obj
, obj_aioc
, &get_obj
, NULL
);
664 obj_aioc
->wait_for_safe();
665 err
= obj_aioc
->get_return_value();
667 //possibly -ENOENT, meaning someone else deleted it.
671 odata
->unwritable
= string(unw_bl
.c_str(), unw_bl
.length()) == "1";
672 odata
->version
= obj_aioc
->get_version64();
673 odata
->size
= odata
->omap
.size();
678 int KvFlatBtreeAsync::read_object(const string
&obj
, rebalance_args
* args
) {
683 librados::AioCompletion
* a
= rados
.aio_create_completion();
684 io_ctx
.aio_exec(obj
, a
, "kvs", "maybe_read_for_balance", inbl
, &outbl
);
686 err
= a
->get_return_value();
688 if (verbose
) cout
<< "\t\t" << client_name
689 << "-read_object: reading failed with "
694 auto it
= outbl
.cbegin();
696 args
->odata
.name
= obj
;
697 args
->odata
.version
= a
->get_version64();
702 void KvFlatBtreeAsync::set_up_prefix_index(
703 const vector
<object_data
> &to_create
,
704 const vector
<object_data
> &to_delete
,
705 librados::ObjectWriteOperation
* owo
,
708 std::map
<std::string
, pair
<bufferlist
, int> > assertions
;
709 map
<string
, bufferlist
> to_insert
;
711 idata
->ts
= ceph_clock_now();
712 for(vector
<object_data
>::const_iterator it
= to_create
.begin();
713 it
!= to_create
.end();
715 create_data
c(it
->min_kdata
, it
->max_kdata
, it
->name
);
716 idata
->to_create
.push_back(c
);
718 for(vector
<object_data
>::const_iterator it
= to_delete
.begin();
719 it
!= to_delete
.end();
721 delete_data
d(it
->min_kdata
, it
->max_kdata
, it
->name
, it
->version
);
722 idata
->to_delete
.push_back(d
);
724 for(vector
<object_data
>::const_iterator it
= to_delete
.begin();
725 it
!= to_delete
.end();
727 idata
->obj
= it
->name
;
728 idata
->min_kdata
= it
->min_kdata
;
729 idata
->kdata
= it
->max_kdata
;
731 idata
->encode(insert
);
732 to_insert
[it
->max_kdata
.encoded()] = insert
;
733 index_data this_entry
;
734 this_entry
.min_kdata
= idata
->min_kdata
;
735 this_entry
.kdata
= idata
->kdata
;
736 this_entry
.obj
= idata
->obj
;
737 assertions
[it
->max_kdata
.encoded()] = pair
<bufferlist
, int>
738 (to_bl(this_entry
), CEPH_OSD_CMPXATTR_OP_EQ
);
739 if (verbose
) cout
<< "\t\t\t" << client_name
740 << "-setup_prefix: will assert "
741 << this_entry
.str() << std::endl
;
743 ceph_assert(*err
== 0);
744 owo
->omap_cmp(assertions
, err
);
745 if (to_create
.size() <= 2) {
746 owo
->omap_set(to_insert
);
750 //some args can be null if there are no corresponding entries in p
751 void KvFlatBtreeAsync::set_up_ops(
752 const vector
<object_data
> &create_vector
,
753 const vector
<object_data
> &delete_vector
,
754 vector
<pair
<pair
<int, string
>, librados::ObjectWriteOperation
*> > * ops
,
755 const index_data
&idata
,
757 vector
<pair
<pair
<int, string
>,
758 librados::ObjectWriteOperation
* > >::iterator it
;
760 //skip the prefixing part
761 for(it
= ops
->begin(); it
->first
.first
== ADD_PREFIX
; ++it
) {}
762 map
<string
, bufferlist
> to_insert
;
763 std::set
<string
> to_remove
;
764 map
<string
, pair
<bufferlist
, int> > assertions
;
765 if (create_vector
.size() > 0) {
766 for (int i
= 0; i
< (int)idata
.to_delete
.size(); ++i
) {
767 it
->first
= pair
<int, string
>(UNWRITE_OBJECT
, idata
.to_delete
[i
].obj
);
768 set_up_unwrite_object(delete_vector
[i
].version
, it
->second
);
772 for (int i
= 0; i
< (int)idata
.to_create
.size(); ++i
) {
773 index_data
this_entry(idata
.to_create
[i
].max
, idata
.to_create
[i
].min
,
774 idata
.to_create
[i
].obj
);
775 to_insert
[idata
.to_create
[i
].max
.encoded()] = to_bl(this_entry
);
776 if (idata
.to_create
.size() <= 2) {
777 it
->first
= pair
<int, string
>(MAKE_OBJECT
, idata
.to_create
[i
].obj
);
779 it
->first
= pair
<int, string
>(AIO_MAKE_OBJECT
, idata
.to_create
[i
].obj
);
781 set_up_make_object(create_vector
[i
].omap
, it
->second
);
784 for (int i
= 0; i
< (int)idata
.to_delete
.size(); ++i
) {
785 index_data this_entry
= idata
;
786 this_entry
.obj
= idata
.to_delete
[i
].obj
;
787 this_entry
.min_kdata
= idata
.to_delete
[i
].min
;
788 this_entry
.kdata
= idata
.to_delete
[i
].max
;
789 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-setup_ops: will assert "
790 << this_entry
.str() << std::endl
;
791 assertions
[idata
.to_delete
[i
].max
.encoded()] = pair
<bufferlist
, int>(
792 to_bl(this_entry
), CEPH_OSD_CMPXATTR_OP_EQ
);
793 to_remove
.insert(idata
.to_delete
[i
].max
.encoded());
794 it
->first
= pair
<int, string
>(REMOVE_OBJECT
, idata
.to_delete
[i
].obj
);
795 set_up_delete_object(it
->second
);
798 if ((int)idata
.to_create
.size() <= 2) {
799 it
->second
->omap_cmp(assertions
, err
);
801 it
->second
->omap_rm_keys(to_remove
);
802 it
->second
->omap_set(to_insert
);
805 it
->first
= pair
<int, string
>(REMOVE_PREFIX
, index_name
);
808 void KvFlatBtreeAsync::set_up_make_object(
809 const map
<std::string
, bufferlist
> &to_set
,
810 librados::ObjectWriteOperation
*owo
) {
812 encode(to_set
, inbl
);
813 owo
->exec("kvs", "create_with_omap", inbl
);
816 void KvFlatBtreeAsync::set_up_unwrite_object(
817 const int &ver
, librados::ObjectWriteOperation
*owo
) {
819 owo
->assert_version(ver
);
821 owo
->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ
, to_bl("0"));
822 owo
->setxattr("unwritable", to_bl("1"));
825 void KvFlatBtreeAsync::set_up_restore_object(
826 librados::ObjectWriteOperation
*owo
) {
827 owo
->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ
, to_bl("1"));
828 owo
->setxattr("unwritable", to_bl("0"));
831 void KvFlatBtreeAsync::set_up_delete_object(
832 librados::ObjectWriteOperation
*owo
) {
833 owo
->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ
, to_bl("1"));
837 int KvFlatBtreeAsync::perform_ops(const string
&debug_prefix
,
838 const index_data
&idata
,
839 vector
<pair
<pair
<int, string
>, librados::ObjectWriteOperation
*> > *ops
) {
841 vector
<librados::AioCompletion
*> aiocs(idata
.to_create
.size());
843 for (vector
<pair
<pair
<int, string
>,
844 librados::ObjectWriteOperation
*> >::iterator it
= ops
->begin();
845 it
!= ops
->end(); ++it
) {
846 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
849 switch (it
->first
.first
) {
850 case ADD_PREFIX
://prefixing
851 if (verbose
) cout
<< debug_prefix
<< " adding prefix" << std::endl
;
852 err
= io_ctx
.operate(index_name
, it
->second
);
854 if (verbose
) cout
<< debug_prefix
<< " prefixing the index failed with "
858 if (verbose
) cout
<< debug_prefix
<< " prefix added." << std::endl
;
860 case UNWRITE_OBJECT
://marking
861 if (verbose
) cout
<< debug_prefix
<< " marking " << it
->first
.second
863 err
= io_ctx
.operate(it
->first
.second
, it
->second
);
865 //most likely because it changed, in which case it will be -ERANGE
866 if (verbose
) cout
<< debug_prefix
<< " marking " << it
->first
.second
867 << "failed with code" << err
<< std::endl
;
868 if (it
->first
.second
== (*idata
.to_delete
.begin()).max
.encoded()) {
869 if (cleanup(idata
, -EFIRSTOBJ
) == -ESUICIDE
) {
873 if (cleanup(idata
, -ERANGE
) == -ESUICIDE
) {
879 if (verbose
) cout
<< debug_prefix
<< " marked " << it
->first
.second
882 case MAKE_OBJECT
://creating
883 if (verbose
) cout
<< debug_prefix
<< " creating " << it
->first
.second
885 err
= io_ctx
.operate(it
->first
.second
, it
->second
);
887 //this can happen if someone else was cleaning up after us.
888 if (verbose
) cout
<< debug_prefix
<< " creating " << it
->first
.second
890 << " with code " << err
<< std::endl
;
891 if (err
== -EEXIST
) {
892 //someone thinks we died, so die
893 if (verbose
) cout
<< client_name
<< " is suiciding!" << std::endl
;
900 if (verbose
|| idata
.to_create
.size() > 2) {
901 cout
<< debug_prefix
<< " created object " << it
->first
.second
905 case AIO_MAKE_OBJECT
:
906 cout
<< debug_prefix
<< " launching asynchronous create "
907 << it
->first
.second
<< std::endl
;
908 aiocs
[count
] = rados
.aio_create_completion();
909 io_ctx
.aio_operate(it
->first
.second
, aiocs
[count
], it
->second
);
911 if ((int)idata
.to_create
.size() == count
) {
912 cout
<< "starting aiowrite waiting loop" << std::endl
;
913 for (count
-= 1; count
>= 0; count
--) {
914 aiocs
[count
]->wait_for_safe();
915 err
= aiocs
[count
]->get_return_value();
917 //this can happen if someone else was cleaning up after us.
918 cerr
<< debug_prefix
<< " a create failed"
919 << " with code " << err
<< std::endl
;
920 if (err
== -EEXIST
) {
921 //someone thinks we died, so die
922 cerr
<< client_name
<< " is suiciding!" << std::endl
;
929 if (verbose
|| idata
.to_create
.size() > 2) {
930 cout
<< debug_prefix
<< " completed aio " << aiocs
.size() - count
931 << "/" << aiocs
.size() << std::endl
;
936 case REMOVE_OBJECT
://deleting
937 if (verbose
) cout
<< debug_prefix
<< " deleting " << it
->first
.second
939 err
= io_ctx
.operate(it
->first
.second
, it
->second
);
941 //if someone else called cleanup on this prefix first
942 if (verbose
) cout
<< debug_prefix
<< " deleting " << it
->first
.second
943 << "failed with code" << err
<< std::endl
;
945 if (verbose
) cout
<< debug_prefix
<< " deleted " << it
->first
.second
948 case REMOVE_PREFIX
://rewriting index
949 if (verbose
) cout
<< debug_prefix
<< " updating index " << std::endl
;
950 err
= io_ctx
.operate(index_name
, it
->second
);
952 if (verbose
) cout
<< debug_prefix
953 << " rewriting the index failed with code " << err
954 << ". someone else must have thought we died, so dying" << std::endl
;
957 if (verbose
) cout
<< debug_prefix
<< " updated index." << std::endl
;
960 if (verbose
) cout
<< debug_prefix
<< " restoring " << it
->first
.second
962 err
= io_ctx
.operate(it
->first
.second
, it
->second
);
964 if (verbose
) cout
<< debug_prefix
<< "restoring " << it
->first
.second
966 << " with " << err
<< std::endl
;
969 if (verbose
) cout
<< debug_prefix
<< " restored " << it
->first
.second
973 if (verbose
) cout
<< debug_prefix
<< " performing unknown op on "
976 err
= io_ctx
.operate(index_name
, it
->second
);
978 if (verbose
) cout
<< debug_prefix
<< " unknown op on "
980 << " failed with " << err
<< std::endl
;
983 if (verbose
) cout
<< debug_prefix
<< " unknown op on "
985 << " succeeded." << std::endl
;
993 int KvFlatBtreeAsync::cleanup(const index_data
&idata
, const int &error
) {
994 if (verbose
) cout
<< "\t\t" << client_name
<< ": cleaning up after "
998 ceph_assert(idata
.prefix
!= "");
999 map
<std::string
,bufferlist
> new_index
;
1000 map
<std::string
, pair
<bufferlist
, int> > assertions
;
1003 //this happens if the split or rebalance failed to mark the first object,
1004 //meaning only the index needs to be changed.
1005 //restore objects that had been marked unwritable.
1006 for(vector
<delete_data
>::const_iterator it
=
1007 idata
.to_delete
.begin();
1008 it
!= idata
.to_delete
.end(); ++it
) {
1009 index_data this_entry
;
1010 this_entry
.obj
= (*it
).obj
;
1011 this_entry
.min_kdata
= it
->min
;
1012 this_entry
.kdata
= it
->max
;
1013 new_index
[it
->max
.encoded()] = to_bl(this_entry
);
1015 this_entry
.obj
= it
->obj
;
1016 this_entry
.min_kdata
= it
->min
;
1017 this_entry
.kdata
= it
->max
;
1018 if (verbose
) cout
<< "\t\t\t" << client_name
1019 << "-cleanup: will assert index contains "
1020 << this_entry
.str() << std::endl
;
1021 assertions
[it
->max
.encoded()] =
1022 pair
<bufferlist
, int>(to_bl(this_entry
),
1023 CEPH_OSD_CMPXATTR_OP_EQ
);
1027 librados::ObjectWriteOperation update_index
;
1028 update_index
.omap_cmp(assertions
, &err
);
1029 update_index
.omap_set(new_index
);
1030 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: updating index"
1032 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1035 err
= io_ctx
.operate(index_name
, &update_index
);
1037 if (verbose
) cout
<< "\t\t\t" << client_name
1038 << "-cleanup: rewriting failed with "
1039 << err
<< ". returning -ECANCELED" << std::endl
;
1042 if (verbose
) cout
<< "\t\t\t" << client_name
1043 << "-cleanup: updated index. cleanup done."
1048 //this happens if a split or rebalance fails to mark an object. It is a
1049 //special case of rolling back that does not have to deal with new objects.
1051 //restore objects that had been marked unwritable.
1052 vector
<delete_data
>::const_iterator it
;
1053 for(it
= idata
.to_delete
.begin();
1054 it
!= idata
.to_delete
.end(); ++it
) {
1055 index_data this_entry
;
1056 this_entry
.obj
= (*it
).obj
;
1057 this_entry
.min_kdata
= it
->min
;
1058 this_entry
.kdata
= it
->max
;
1059 new_index
[it
->max
.encoded()] = to_bl(this_entry
);
1061 this_entry
.obj
= it
->obj
;
1062 this_entry
.min_kdata
= it
->min
;
1063 this_entry
.kdata
= it
->max
;
1064 if (verbose
) cout
<< "\t\t\t" << client_name
1065 << "-cleanup: will assert index contains "
1066 << this_entry
.str() << std::endl
;
1067 assertions
[it
->max
.encoded()] =
1068 pair
<bufferlist
, int>(to_bl(this_entry
),
1069 CEPH_OSD_CMPXATTR_OP_EQ
);
1071 it
= idata
.to_delete
.begin();
1072 librados::ObjectWriteOperation restore
;
1073 set_up_restore_object(&restore
);
1074 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1077 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restoring "
1080 err
= io_ctx
.operate(it
->obj
, &restore
);
1082 //i.e., -ECANCELED because the object was already restored by someone
1084 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restoring "
1086 << " failed with " << err
<< std::endl
;
1088 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restored "
1094 librados::ObjectWriteOperation update_index
;
1095 update_index
.omap_cmp(assertions
, &err
);
1096 update_index
.omap_set(new_index
);
1097 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: updating index"
1099 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1102 err
= io_ctx
.operate(index_name
, &update_index
);
1104 if (verbose
) cout
<< "\t\t\t" << client_name
1105 << "-cleanup: rewriting failed with "
1106 << err
<< ". returning -ECANCELED" << std::endl
;
1109 if (verbose
) cout
<< "\t\t\t" << client_name
1110 << "-cleanup: updated index. cleanup done."
1115 if (verbose
) cout
<< "\t\t" << client_name
<< "-cleanup: rolling forward"
1117 //all changes were created except for updating the index and possibly
1118 //deleting the objects. roll forward.
1119 vector
<pair
<pair
<int, string
>, librados::ObjectWriteOperation
*> > ops
;
1120 vector
<librados::ObjectWriteOperation
> owos(idata
.to_delete
.size() + 1);
1121 for (int i
= 0; i
<= (int)idata
.to_delete
.size(); ++i
) {
1122 ops
.push_back(make_pair(pair
<int, string
>(0, ""), &owos
[i
]));
1124 set_up_ops(vector
<object_data
>(),
1125 vector
<object_data
>(), &ops
, idata
, &err
);
1126 err
= perform_ops("\t\t" + client_name
+ "-cleanup:", idata
, &ops
);
1128 if (err
== -ESUICIDE
) {
1131 if (verbose
) cout
<< "\t\t\t" << client_name
1132 << "-cleanup: rewriting failed with "
1133 << err
<< ". returning -ECANCELED" << std::endl
;
1136 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: updated index"
1141 //roll back all changes.
1142 if (verbose
) cout
<< "\t\t" << client_name
<< "-cleanup: rolling back"
1144 map
<std::string
,bufferlist
> new_index
;
1145 std::set
<string
> to_remove
;
1146 map
<std::string
, pair
<bufferlist
, int> > assertions
;
1148 //mark the objects to be created. if someone else already has, die.
1149 for(vector
<create_data
>::const_reverse_iterator it
=
1150 idata
.to_create
.rbegin();
1151 it
!= idata
.to_create
.rend(); ++it
) {
1152 librados::ObjectWriteOperation rm
;
1153 set_up_unwrite_object(0, &rm
);
1154 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 )
1158 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: marking "
1161 err
= io_ctx
.operate(it
->obj
, &rm
);
1163 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: marking "
1165 << " failed with " << err
<< std::endl
;
1167 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: marked "
1173 //restore objects that had been marked unwritable.
1174 for(vector
<delete_data
>::const_iterator it
=
1175 idata
.to_delete
.begin();
1176 it
!= idata
.to_delete
.end(); ++it
) {
1177 index_data this_entry
;
1178 this_entry
.obj
= (*it
).obj
;
1179 this_entry
.min_kdata
= it
->min
;
1180 this_entry
.kdata
= it
->max
;
1181 new_index
[it
->max
.encoded()] = to_bl(this_entry
);
1183 this_entry
.obj
= it
->obj
;
1184 this_entry
.min_kdata
= it
->min
;
1185 this_entry
.kdata
= it
->max
;
1186 if (verbose
) cout
<< "\t\t\t" << client_name
1187 << "-cleanup: will assert index contains "
1188 << this_entry
.str() << std::endl
;
1189 assertions
[it
->max
.encoded()] =
1190 pair
<bufferlist
, int>(to_bl(this_entry
),
1191 CEPH_OSD_CMPXATTR_OP_EQ
);
1192 librados::ObjectWriteOperation restore
;
1193 set_up_restore_object(&restore
);
1194 if (verbose
) cout
<< "\t\t\t" << client_name
1195 << "-cleanup: will assert index contains "
1196 << this_entry
.str() << std::endl
;
1197 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 )
1201 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restoring "
1204 err
= io_ctx
.operate(it
->obj
, &restore
);
1205 if (err
== -ENOENT
) {
1206 //it had gotten far enough to be rolled forward - unmark the objects
1208 if (verbose
) cout
<< "\t\t\t" << client_name
1209 << "-cleanup: roll forward instead"
1211 for(vector
<create_data
>::const_iterator cit
=
1212 idata
.to_create
.begin();
1213 cit
!= idata
.to_create
.end(); ++cit
) {
1214 librados::ObjectWriteOperation res
;
1215 set_up_restore_object(&res
);
1216 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)()
1220 if (verbose
) cout
<< "\t\t\t" << client_name
1221 << "-cleanup: restoring " << cit
->obj
1223 err
= io_ctx
.operate(cit
->obj
, &res
);
1225 if (verbose
) cout
<< "\t\t\t" << client_name
1226 << "-cleanup: restoring "
1227 << cit
->obj
<< " failed with " << err
<< std::endl
;
1229 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restored "
1233 return cleanup(idata
, -ENOENT
);
1234 } else if (err
< 0) {
1235 //i.e., -ECANCELED because the object was already restored by someone
1237 if (verbose
) cout
<< "\t\t\t" << client_name
1238 << "-cleanup: restoring " << it
->obj
1239 << " failed with " << err
<< std::endl
;
1241 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restored "
1247 //remove the new objects
1248 for(vector
<create_data
>::const_reverse_iterator it
=
1249 idata
.to_create
.rbegin();
1250 it
!= idata
.to_create
.rend(); ++it
) {
1251 to_remove
.insert(it
->max
.encoded());
1252 librados::ObjectWriteOperation rm
;
1254 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 )
1258 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: removing "
1261 err
= io_ctx
.operate(it
->obj
, &rm
);
1263 if (verbose
) cout
<< "\t\t\t" << client_name
1264 << "-cleanup: failed to remove "
1265 << it
->obj
<< std::endl
;
1267 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: removed "
1274 librados::ObjectWriteOperation update_index
;
1275 update_index
.omap_cmp(assertions
, &err
);
1276 update_index
.omap_rm_keys(to_remove
);
1277 update_index
.omap_set(new_index
);
1278 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: updating index"
1280 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1283 err
= io_ctx
.operate(index_name
, &update_index
);
1285 if (verbose
) cout
<< "\t\t\t" << client_name
1286 << "-cleanup: rewriting failed with "
1287 << err
<< ". returning -ECANCELED" << std::endl
;
1290 if (verbose
) cout
<< "\t\t\t" << client_name
1291 << "-cleanup: updated index. cleanup done."
1299 string
KvFlatBtreeAsync::to_string(string s
, int i
) {
1305 string
KvFlatBtreeAsync::get_name() {
1309 void KvFlatBtreeAsync::set_inject(injection_t inject
, int wait_time
) {
1311 wait_ms
= wait_time
;
1314 int KvFlatBtreeAsync::setup(int argc
, const char** argv
) {
1315 int r
= rados
.init(rados_id
.c_str());
1317 cerr
<< "error during init" << r
<< std::endl
;
1320 r
= rados
.conf_parse_argv(argc
, argv
);
1322 cerr
<< "error during parsing args" << r
<< std::endl
;
1325 r
= rados
.conf_parse_env(NULL
);
1327 cerr
<< "error during parsing env" << r
<< std::endl
;
1330 r
= rados
.conf_read_file(NULL
);
1332 cerr
<< "error during read file: " << r
<< std::endl
;
1335 r
= rados
.connect();
1337 cerr
<< "error during connect: " << r
<< std::endl
;
1340 r
= rados
.ioctx_create(pool_name
.c_str(), io_ctx
);
1342 cerr
<< "error creating io ctx: " << r
<< std::endl
;
1347 librados::ObjectWriteOperation make_index
;
1348 make_index
.create(true);
1349 map
<std::string
,bufferlist
> index_map
;
1351 idata
.obj
= client_name
;
1352 idata
.min_kdata
.raw_key
= "";
1353 idata
.kdata
= key_data("");
1354 index_map
["1"] = to_bl(idata
);
1355 make_index
.omap_set(index_map
);
1356 r
= io_ctx
.operate(index_name
, &make_index
);
1358 if (verbose
) cout
<< client_name
<< ": Making the index failed with code "
1363 if (verbose
) cout
<< client_name
<< ": created index object" << std::endl
;
1365 librados::ObjectWriteOperation make_max_obj
;
1366 make_max_obj
.create(true);
1367 make_max_obj
.setxattr("unwritable", to_bl("0"));
1368 make_max_obj
.setxattr("size", to_bl("0"));
1369 r
= io_ctx
.operate(client_name
, &make_max_obj
);
1371 if (verbose
) cout
<< client_name
<< ": Setting xattr failed with code "
1379 int KvFlatBtreeAsync::set(const string
&key
, const bufferlist
&val
,
1380 bool update_on_existing
) {
1381 if (verbose
) cout
<< client_name
<< " is "
1382 << (update_on_existing
? "updating " : "setting ")
1383 << key
<< std::endl
;
1386 index_data
idata(key
);
1388 if (verbose
) cout
<< "\t" << client_name
<< ": finding oid" << std::endl
;
1389 err
= read_index(key
, &idata
, NULL
, false);
1391 if (verbose
) cout
<< "\t" << client_name
1392 << ": getting oid failed with code "
1393 << err
<< std::endl
;
1396 if (verbose
) cout
<< "\t" << client_name
<< ": index data is " << idata
.str()
1397 << ", object is " << idata
.obj
<< std::endl
;
1399 err
= set_op(key
, val
, update_on_existing
, idata
);
1401 if (verbose
) cout
<< "\t" << client_name
<< ": finished set with " << err
1406 int KvFlatBtreeAsync::set_op(const string
&key
, const bufferlist
&val
,
1407 bool update_on_existing
, index_data
&idata
) {
1413 args
.exclusive
= !update_on_existing
;
1414 args
.omap
[key
] = val
;
1417 librados::ObjectWriteOperation owo
;
1418 owo
.exec("kvs", "omap_insert", inbl
);
1419 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1420 if (verbose
) cout
<< client_name
<< " IS SUICIDING!" << std::endl
;
1423 if (verbose
) cout
<< "\t" << client_name
<< ": inserting " << key
1425 << idata
.obj
<< std::endl
;
1426 int err
= io_ctx
.operate(idata
.obj
, &owo
);
1430 //the key already exists and this is an exclusive insert.
1431 cerr
<< "\t" << client_name
<< ": writing key failed with "
1432 << err
<< std::endl
;
1435 case -EKEYREJECTED
: {
1436 //the object needs to be split.
1438 if (verbose
) cout
<< "\t" << client_name
<< ": running split on "
1441 err
= read_index(key
, &idata
, NULL
, true);
1443 if (verbose
) cout
<< "\t" << client_name
1444 << ": getting oid failed with code "
1445 << err
<< std::endl
;
1449 if (err
< 0 && err
!= -ENOENT
&& err
!= -EBALANCE
) {
1450 if (verbose
) cerr
<< "\t" << client_name
<< ": split failed with "
1451 << err
<< std::endl
;
1452 int ret
= handle_set_rm_errors(err
, idata
.obj
, key
, &idata
, NULL
);
1455 if (verbose
) cout
<< client_name
<< " IS SUICIDING!" << std::endl
;
1459 return set_op(key
, val
, update_on_existing
, idata
);
1466 } while (err
< 0 && err
!= -EBALANCE
&& err
!= -ENOENT
);
1467 err
= read_index(key
, &idata
, NULL
, true);
1469 if (verbose
) cout
<< "\t" << client_name
1470 << ": getting oid failed with code "
1471 << err
<< std::endl
;
1474 return set_op(key
, val
, update_on_existing
, idata
);
1477 if (verbose
) cerr
<< "\t" << client_name
<< ": writing obj failed with "
1478 << err
<< std::endl
;
1479 if (err
== -ENOENT
|| err
== -EACCES
) {
1480 if (err
== -ENOENT
) {
1481 if (verbose
) cout
<< "CACHE FAILURE" << std::endl
;
1483 err
= read_index(key
, &idata
, NULL
, true);
1485 if (verbose
) cout
<< "\t" << client_name
1486 << ": getting oid failed with code "
1487 << err
<< std::endl
;
1490 if (verbose
) cout
<< "\t" << client_name
<< ": index data is "
1492 << ", object is " << idata
.obj
<< std::endl
;
1493 return set_op(key
, val
, update_on_existing
, idata
);
1502 int KvFlatBtreeAsync::remove(const string
&key
) {
1503 if (verbose
) cout
<< client_name
<< ": removing " << key
<< std::endl
;
1508 index_data next_idata
;
1510 if (verbose
) cout
<< "\t" << client_name
<< ": finding oid" << std::endl
;
1511 err
= read_index(key
, &idata
, &next_idata
, false);
1513 if (verbose
) cout
<< "getting oid failed with code " << err
<< std::endl
;
1517 if (verbose
) cout
<< "\t" << client_name
<< ": idata is " << idata
.str()
1518 << ", next_idata is " << next_idata
.str()
1519 << ", obj is " << obj
<< std::endl
;
1521 err
= remove_op(key
, idata
, next_idata
);
1523 if (verbose
) cout
<< "\t" << client_name
<< ": finished remove with " << err
1524 << " and exiting" << std::endl
;
1528 int KvFlatBtreeAsync::remove_op(const string
&key
, index_data
&idata
,
1529 index_data
&next_idata
) {
1534 args
.omap
.insert(key
);
1537 librados::ObjectWriteOperation owo
;
1538 owo
.exec("kvs", "omap_remove", inbl
);
1539 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1540 if (verbose
) cout
<< client_name
<< " IS SUICIDING!" << std::endl
;
1543 if (verbose
) cout
<< "\t" << client_name
<< ": removing " << key
<< " from "
1546 int err
= io_ctx
.operate(idata
.obj
, &owo
);
1548 if (verbose
) cout
<< "\t" << client_name
<< ": writing obj failed with "
1549 << err
<< std::endl
;
1552 //the key does not exist in the object
1555 case -EKEYREJECTED
: {
1556 //the object needs to be split.
1558 if (verbose
) cerr
<< "\t" << client_name
<< ": running rebalance on "
1559 << idata
.obj
<< std::endl
;
1560 err
= read_index(key
, &idata
, &next_idata
, true);
1562 if (verbose
) cout
<< "\t" << client_name
1563 << ": getting oid failed with code "
1564 << err
<< std::endl
;
1567 err
= rebalance(idata
, next_idata
);
1568 if (err
< 0 && err
!= -ENOENT
&& err
!= -EBALANCE
) {
1569 if (verbose
) cerr
<< "\t" << client_name
<< ": rebalance returned "
1570 << err
<< std::endl
;
1571 int ret
= handle_set_rm_errors(err
, idata
.obj
, key
, &idata
,
1575 if (verbose
) cout
<< client_name
<< " IS SUICIDING!" << std::endl
;
1579 return remove_op(key
, idata
, next_idata
);
1585 //this is the only node, so it's ok to go below k.
1586 librados::ObjectWriteOperation owo
;
1590 args
.omap
.insert(key
);
1592 owo
.exec("kvs", "omap_remove", inbl
);
1593 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)()
1595 if (verbose
) cout
<< client_name
<< " IS SUICIDING!"
1599 if (verbose
) cout
<< "\t" << client_name
<< ": removing " << key
1603 int err
= io_ctx
.operate(idata
.obj
, &owo
);
1609 } while (err
< 0 && err
!= -EBALANCE
&& err
!= -ENOENT
);
1610 err
= read_index(key
, &idata
, &next_idata
, true);
1612 if (verbose
) cout
<< "\t" << client_name
1613 << ": getting oid failed with code "
1614 << err
<< std::endl
;
1620 if (err
== -ENOENT
|| err
== -EACCES
) {
1621 err
= read_index(key
, &idata
, &next_idata
, true);
1623 if (verbose
) cout
<< "\t" << client_name
1624 << ": getting oid failed with code "
1625 << err
<< std::endl
;
1628 if (verbose
) cout
<< "\t" << client_name
<< ": index data is "
1630 << ", object is " << idata
.obj
<< std::endl
;
1631 //idea: we read the time every time we read the index anyway - store it.
1632 return remove_op(key
, idata
, next_idata
);
1641 int KvFlatBtreeAsync::handle_set_rm_errors(int &err
, string obj
,
1643 index_data
* idata
, index_data
* next_idata
) {
1644 if (err
== -ESUICIDE
) {
1646 } else if (err
== -ECANCELED
//if an object was unwritable or index changed
1647 || err
== -EPREFIX
//if there is currently a prefix
1648 || err
== -ETIMEDOUT
// if the index changes during the op - i.e. cleanup
1649 || err
== -EACCES
) //possible if we were acting on old index data
1651 err
= read_index(key
, idata
, next_idata
, true);
1655 if (verbose
) cout
<< "\t" << client_name
<< ": prefix is " << idata
->str()
1657 if (idata
->obj
!= obj
) {
1658 //someone else has split or cleaned up or something. start over.
1659 return 1;//meaning repeat
1661 } else if (err
!= -ETIMEDOUT
&& err
!= -ERANGE
&& err
!= -EACCES
1662 && err
!= -EUCLEAN
){
1663 if (verbose
) cout
<< "\t" << client_name
1664 << ": split encountered an unexpected error: " << err
1671 int KvFlatBtreeAsync::get(const string
&key
, bufferlist
*val
) {
1673 if (verbose
) cout
<< client_name
<< ": getting " << key
<< std::endl
;
1678 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1681 err
= read_index(key
, &idata
, NULL
, false);
1682 mytime
= ceph_clock_now();
1684 if (verbose
) cout
<< "getting oid failed with code " << err
<< std::endl
;
1688 err
= get_op(key
, val
, idata
);
1690 if (verbose
) cout
<< client_name
<< ": got " << key
<< " with " << err
1696 int KvFlatBtreeAsync::get_op(const string
&key
, bufferlist
*val
,
1697 index_data
&idata
) {
1699 std::set
<std::string
> key_set
;
1700 key_set
.insert(key
);
1701 map
<std::string
,bufferlist
> omap
;
1702 librados::ObjectReadOperation read
;
1703 read
.omap_get_vals_by_keys(key_set
, &omap
, &err
);
1704 err
= io_ctx
.operate(idata
.obj
, &read
, NULL
);
1706 if (err
== -ENOENT
) {
1707 err
= read_index(key
, &idata
, NULL
, true);
1709 if (verbose
) cout
<< "\t" << client_name
1710 << ": getting oid failed with code "
1711 << err
<< std::endl
;
1714 if (verbose
) cout
<< "\t" << client_name
<< ": index data is "
1716 << ", object is " << idata
.obj
<< std::endl
;
1717 return get_op(key
, val
, idata
);
1719 if (verbose
) cout
<< client_name
1720 << ": get encountered an unexpected error: " << err
1730 void *KvFlatBtreeAsync::pset(void *ptr
) {
1731 struct aio_set_args
*args
= (struct aio_set_args
*)ptr
;
1733 args
->kvba
->KvFlatBtreeAsync::set((string
)args
->key
,
1734 (bufferlist
)args
->val
, (bool)args
->exc
);
1735 args
->cb(args
->err
, args
->cb_args
);
1740 void KvFlatBtreeAsync::aio_set(const string
&key
, const bufferlist
&val
,
1741 bool exclusive
, callback cb
, void * cb_args
, int * err
) {
1742 aio_set_args
*args
= new aio_set_args();
1746 args
->exc
= exclusive
;
1748 args
->cb_args
= cb_args
;
1751 int r
= pthread_create(&t
, NULL
, pset
, (void*)args
);
1759 void *KvFlatBtreeAsync::prm(void *ptr
) {
1760 struct aio_rm_args
*args
= (struct aio_rm_args
*)ptr
;
1762 args
->kvba
->KvFlatBtreeAsync::remove((string
)args
->key
);
1763 args
->cb(args
->err
, args
->cb_args
);
1768 void KvFlatBtreeAsync::aio_remove(const string
&key
,
1769 callback cb
, void * cb_args
, int * err
) {
1770 aio_rm_args
* args
= new aio_rm_args();
1774 args
->cb_args
= cb_args
;
1777 int r
= pthread_create(&t
, NULL
, prm
, (void*)args
);
1785 void *KvFlatBtreeAsync::pget(void *ptr
) {
1786 struct aio_get_args
*args
= (struct aio_get_args
*)ptr
;
1788 args
->kvba
->KvFlatBtreeAsync::get((string
)args
->key
,
1789 (bufferlist
*)args
->val
);
1790 args
->cb(args
->err
, args
->cb_args
);
1795 void KvFlatBtreeAsync::aio_get(const string
&key
, bufferlist
*val
,
1796 callback cb
, void * cb_args
, int * err
) {
1797 aio_get_args
* args
= new aio_get_args();
1802 args
->cb_args
= cb_args
;
1805 int r
= pthread_create(&t
, NULL
, pget
, (void*)args
);
1813 int KvFlatBtreeAsync::set_many(const map
<string
, bufferlist
> &in_map
) {
1817 std::set
<string
> keys
;
1819 map
<string
, bufferlist
> big_map
;
1820 for (map
<string
, bufferlist
>::const_iterator it
= in_map
.begin();
1821 it
!= in_map
.end(); ++it
) {
1822 keys
.insert(it
->first
);
1823 big_map
.insert(*it
);
1826 if (verbose
) cout
<< "created key set and big_map" << std::endl
;
1829 librados::AioCompletion
* aioc
= rados
.aio_create_completion();
1830 io_ctx
.aio_exec(index_name
, aioc
, "kvs", "read_many", inbl
, &outbl
);
1831 aioc
->wait_for_safe();
1832 err
= aioc
->get_return_value();
1835 cerr
<< "getting index failed with " << err
<< std::endl
;
1839 map
<string
, bufferlist
> imap
;//read from the index
1840 auto blit
= outbl
.cbegin();
1843 if (verbose
) cout
<< "finished reading index for objects. there are "
1844 << imap
.size() << " entries that need to be changed. " << std::endl
;
1847 vector
<object_data
> to_delete
;
1849 vector
<object_data
> to_create
;
1851 if (verbose
) cout
<< "setting up to_delete and to_create vectors from index "
1852 << "map" << std::endl
;
1853 //set up to_delete from index map
1854 for (map
<string
, bufferlist
>::iterator it
= imap
.begin(); it
!= imap
.end();
1857 blit
= it
->second
.begin();
1859 to_delete
.push_back(object_data(idata
.min_kdata
, idata
.kdata
, idata
.obj
));
1860 err
= read_object(idata
.obj
, &to_delete
[to_delete
.size() - 1]);
1862 if (verbose
) cout
<< "reading " << idata
.obj
<< " failed with " << err
1864 return set_many(in_map
);
1867 big_map
.insert(to_delete
[to_delete
.size() - 1].omap
.begin(),
1868 to_delete
[to_delete
.size() - 1].omap
.end());
1871 to_create
.push_back(object_data(
1872 to_string(client_name
, client_index
++)));
1873 to_create
[0].min_kdata
= to_delete
[0].min_kdata
;
1875 for(map
<string
, bufferlist
>::iterator it
= big_map
.begin();
1876 it
!= big_map
.end(); ++it
) {
1877 if (to_create
[to_create
.size() - 1].omap
.size() == 1.5 * k
) {
1878 to_create
[to_create
.size() - 1].max_kdata
=
1879 key_data(to_create
[to_create
.size() - 1]
1880 .omap
.rbegin()->first
);
1882 to_create
.push_back(object_data(
1883 to_string(client_name
, client_index
++)));
1884 to_create
[to_create
.size() - 1].min_kdata
=
1885 to_create
[to_create
.size() - 2].max_kdata
;
1888 to_create
[to_create
.size() - 1].omap
.insert(*it
);
1890 to_create
[to_create
.size() - 1].max_kdata
=
1891 to_delete
[to_delete
.size() - 1].max_kdata
;
1893 vector
<librados::ObjectWriteOperation
> owos(2 + 2 * to_delete
.size()
1894 + to_create
.size());
1895 vector
<pair
<pair
<int, string
>, librados::ObjectWriteOperation
*> > ops
;
1899 set_up_prefix_index(to_create
, to_delete
, &owos
[0], &idata
, &err
);
1901 if (verbose
) cout
<< "finished making to_create and to_delete. "
1904 ops
.push_back(make_pair(
1905 pair
<int, string
>(ADD_PREFIX
, index_name
),
1907 for (int i
= 1; i
< 2 + 2 * (int)to_delete
.size() + (int)to_create
.size();
1909 ops
.push_back(make_pair(make_pair(0,""), &owos
[i
]));
1912 set_up_ops(to_create
, to_delete
, &ops
, idata
, &err
);
1914 cout
<< "finished setting up ops. Starting critical section..." << std::endl
;
1916 /////BEGIN CRITICAL SECTION/////
1917 //put prefix on index entry for idata.val
1918 err
= perform_ops("\t\t" + client_name
+ "-set_many:", idata
, &ops
);
1920 return set_many(in_map
);
1922 if (verbose
) cout
<< "\t\t" << client_name
<< "-split: done splitting."
1924 /////END CRITICAL SECTION/////
1926 for (vector
<delete_data
>::iterator it
= idata
.to_delete
.begin();
1927 it
!= idata
.to_delete
.end(); ++it
) {
1928 icache
.erase(it
->max
);
1930 for (vector
<create_data
>::iterator it
= idata
.to_create
.begin();
1931 it
!= idata
.to_create
.end(); ++it
) {
1932 icache
.push(index_data(*it
));
1934 icache_lock
.Unlock();
1938 int KvFlatBtreeAsync::remove_all() {
1939 if (verbose
) cout
<< client_name
<< ": removing all" << std::endl
;
1941 librados::ObjectReadOperation oro
;
1942 librados::AioCompletion
* oro_aioc
= rados
.aio_create_completion();
1943 std::map
<std::string
, bufferlist
> index_set
;
1944 oro
.omap_get_vals2("",LONG_MAX
,&index_set
, nullptr, &err
);
1945 err
= io_ctx
.aio_operate(index_name
, oro_aioc
, &oro
, NULL
);
1947 if (err
== -ENOENT
) {
1950 if (verbose
) cout
<< "getting keys failed with error " << err
<< std::endl
;
1953 oro_aioc
->wait_for_safe();
1954 oro_aioc
->release();
1956 librados::ObjectWriteOperation rm_index
;
1957 librados::AioCompletion
* rm_index_aioc
= rados
.aio_create_completion();
1958 map
<std::string
,bufferlist
> new_index
;
1959 new_index
["1"] = index_set
["1"];
1960 rm_index
.omap_clear();
1961 rm_index
.omap_set(new_index
);
1962 io_ctx
.aio_operate(index_name
, rm_index_aioc
, &rm_index
);
1963 err
= rm_index_aioc
->get_return_value();
1964 rm_index_aioc
->release();
1966 if (verbose
) cout
<< "rm index aioc failed with " << err
1971 if (!index_set
.empty()) {
1972 for (std::map
<std::string
,bufferlist
>::iterator it
= index_set
.begin();
1973 it
!= index_set
.end(); ++it
){
1974 librados::ObjectWriteOperation sub
;
1975 if (it
->first
== "1") {
1981 auto b
= it
->second
.cbegin();
1983 io_ctx
.operate(idata
.obj
, &sub
);
1992 int KvFlatBtreeAsync::get_all_keys(std::set
<std::string
> *keys
) {
1993 if (verbose
) cout
<< client_name
<< ": getting all keys" << std::endl
;
1995 librados::ObjectReadOperation oro
;
1996 std::map
<std::string
,bufferlist
> index_set
;
1997 oro
.omap_get_vals2("",LONG_MAX
,&index_set
, nullptr, &err
);
1998 io_ctx
.operate(index_name
, &oro
, NULL
);
2000 if (verbose
) cout
<< "getting keys failed with error " << err
<< std::endl
;
2003 for (std::map
<std::string
,bufferlist
>::iterator it
= index_set
.begin();
2004 it
!= index_set
.end(); ++it
){
2005 librados::ObjectReadOperation sub
;
2006 std::set
<std::string
> ret
;
2007 sub
.omap_get_keys2("",LONG_MAX
,&ret
, nullptr, &err
);
2009 auto b
= it
->second
.cbegin();
2011 io_ctx
.operate(idata
.obj
, &sub
, NULL
);
2012 keys
->insert(ret
.begin(), ret
.end());
2017 int KvFlatBtreeAsync::get_all_keys_and_values(
2018 map
<std::string
,bufferlist
> *kv_map
) {
2019 if (verbose
) cout
<< client_name
<< ": getting all keys and values"
2022 librados::ObjectReadOperation first_read
;
2023 std::set
<std::string
> index_set
;
2024 first_read
.omap_get_keys2("",LONG_MAX
,&index_set
, nullptr, &err
);
2025 io_ctx
.operate(index_name
, &first_read
, NULL
);
2027 if (verbose
) cout
<< "getting keys failed with error " << err
<< std::endl
;
2030 for (std::set
<std::string
>::iterator it
= index_set
.begin();
2031 it
!= index_set
.end(); ++it
){
2032 librados::ObjectReadOperation sub
;
2033 map
<std::string
, bufferlist
> ret
;
2034 sub
.omap_get_vals2("",LONG_MAX
,&ret
, nullptr, &err
);
2035 io_ctx
.operate(*it
, &sub
, NULL
);
2036 kv_map
->insert(ret
.begin(), ret
.end());
2041 bool KvFlatBtreeAsync::is_consistent() {
2044 if (verbose
) cout
<< client_name
<< ": checking consistency" << std::endl
;
2045 std::map
<std::string
,bufferlist
> index
;
2046 map
<std::string
, std::set
<std::string
> > sub_objs
;
2047 librados::ObjectReadOperation oro
;
2048 oro
.omap_get_vals2("",LONG_MAX
,&index
, nullptr, &err
);
2049 io_ctx
.operate(index_name
, &oro
, NULL
);
2051 //probably because the index doesn't exist - this might be ok.
2052 for (librados::NObjectIterator oit
= io_ctx
.nobjects_begin();
2053 oit
!= io_ctx
.nobjects_end(); ++oit
) {
2054 //if this executes, there are floating objects.
2055 cerr
<< "Not consistent! found floating object " << oit
->get_oid()
2062 std::map
<std::string
, string
> parsed_index
;
2063 std::set
<std::string
> onames
;
2064 std::set
<std::string
> special_names
;
2065 for (map
<std::string
,bufferlist
>::iterator it
= index
.begin();
2066 it
!= index
.end(); ++it
) {
2067 if (it
->first
!= "") {
2069 auto b
= it
->second
.cbegin();
2071 if (idata
.prefix
!= "") {
2072 for(vector
<delete_data
>::iterator dit
= idata
.to_delete
.begin();
2073 dit
!= idata
.to_delete
.end(); ++dit
) {
2074 librados::ObjectReadOperation oro
;
2075 librados::AioCompletion
* aioc
= rados
.aio_create_completion();
2077 oro
.getxattr("unwritable", &un
, &err
);
2078 io_ctx
.aio_operate(dit
->obj
, aioc
, &oro
, NULL
);
2079 aioc
->wait_for_safe();
2080 err
= aioc
->get_return_value();
2081 if (ceph_clock_now() - idata
.ts
> timeout
) {
2084 if (err
== -ENOENT
) {
2087 cerr
<< "Not consistent! reading object " << dit
->obj
2088 << "returned " << err
<< std::endl
;
2093 if (atoi(string(un
.c_str(), un
.length()).c_str()) != 1 &&
2094 aioc
->get_version64() != dit
->version
) {
2095 cerr
<< "Not consistent! object " << dit
->obj
<< " has been "
2096 << " modified since the client died was not cleaned up."
2101 special_names
.insert(dit
->obj
);
2104 for(vector
<create_data
>::iterator cit
= idata
.to_create
.begin();
2105 cit
!= idata
.to_create
.end(); ++cit
) {
2106 special_names
.insert(cit
->obj
);
2109 parsed_index
.insert(make_pair(it
->first
, idata
.obj
));
2110 onames
.insert(idata
.obj
);
2114 //make sure that an object exists iff it either is the index
2115 //or is listed in the index
2116 for (librados::NObjectIterator oit
= io_ctx
.nobjects_begin();
2117 oit
!= io_ctx
.nobjects_end(); ++oit
) {
2118 string name
= oit
->get_oid();
2119 if (name
!= index_name
&& onames
.count(name
) == 0
2120 && special_names
.count(name
) == 0) {
2121 cerr
<< "Not consistent! found floating object " << name
<< std::endl
;
2128 for (std::map
<std::string
, string
>::iterator it
= parsed_index
.begin();
2129 it
!= parsed_index
.end();
2131 librados::ObjectReadOperation read
;
2132 read
.omap_get_keys2("", LONG_MAX
, &sub_objs
[it
->second
], nullptr, &err
);
2133 err
= io_ctx
.operate(it
->second
, &read
, NULL
);
2134 int size_int
= (int)sub_objs
[it
->second
].size();
2136 //check that size is in the right range
2137 if (it
->first
!= "1" && special_names
.count(it
->second
) == 0 &&
2138 err
!= -ENOENT
&& (size_int
> 2*k
|| size_int
< k
)
2139 && parsed_index
.size() > 1) {
2140 cerr
<< "Not consistent! Object " << *it
<< " has size " << size_int
2141 << ", which is outside the acceptable range." << std::endl
;
2145 //check that all keys belong in that object
2146 for(std::set
<std::string
>::iterator subit
= sub_objs
[it
->second
].begin();
2147 subit
!= sub_objs
[it
->second
].end(); ++subit
) {
2148 if ((it
->first
!= "1"
2149 && *subit
> it
->first
.substr(1,it
->first
.length()))
2150 || *subit
<= prev
) {
2151 cerr
<< "Not consistent! key " << *subit
<< " does not belong in "
2152 << *it
<< std::endl
;
2153 cerr
<< "not last element, i.e. " << it
->first
<< " not equal to 1? "
2154 << (it
->first
!= "1") << std::endl
2155 << "greater than " << it
->first
.substr(1,it
->first
.length())
2156 <<"? " << (*subit
> it
->first
.substr(1,it
->first
.length()))
2158 << "less than or equal to " << prev
<< "? "
2159 << (*subit
<= prev
) << std::endl
;
2164 prev
= it
->first
.substr(1,it
->first
.length());
2168 if (verbose
) cout
<< "failed consistency test - see error log"
2172 if (verbose
) cout
<< "passed consistency test" << std::endl
;
2177 string
KvFlatBtreeAsync::str() {
2179 ret
<< "Top-level map:" << std::endl
;
2181 std::set
<std::string
> keys
;
2182 std::map
<std::string
,bufferlist
> index
;
2183 librados::ObjectReadOperation oro
;
2184 librados::AioCompletion
* top_aioc
= rados
.aio_create_completion();
2185 oro
.omap_get_vals2("",LONG_MAX
,&index
, nullptr, &err
);
2186 io_ctx
.aio_operate(index_name
, top_aioc
, &oro
, NULL
);
2187 top_aioc
->wait_for_safe();
2188 err
= top_aioc
->get_return_value();
2189 top_aioc
->release();
2190 if (err
< 0 && err
!= -5){
2191 if (verbose
) cout
<< "getting keys failed with error " << err
<< std::endl
;
2195 ret
<< "There are no objects!" << std::endl
;
2199 for (map
<std::string
,bufferlist
>::iterator it
= index
.begin();
2200 it
!= index
.end(); ++it
) {
2201 keys
.insert(string(it
->second
.c_str(), it
->second
.length())
2202 .substr(1,it
->second
.length()));
2205 vector
<std::string
> all_names
;
2206 vector
<int> all_sizes(index
.size());
2207 vector
<int> all_versions(index
.size());
2208 vector
<bufferlist
> all_unwrit(index
.size());
2209 vector
<map
<std::string
,bufferlist
> > all_maps(keys
.size());
2210 vector
<map
<std::string
,bufferlist
>::iterator
> its(keys
.size());
2212 vector
<bool> dones(keys
.size());
2213 ret
<< std::endl
<< string(150,'-') << std::endl
;
2215 for (map
<std::string
,bufferlist
>::iterator it
= index
.begin();
2216 it
!= index
.end(); ++it
){
2218 auto b
= it
->second
.cbegin();
2220 string s
= idata
.str();
2221 ret
<< "|" << string((148 -
2222 ((*it
).first
.length()+s
.length()+3))/2,' ');
2225 ret
<< string(idata
.str());
2226 ret
<< string((148 -
2227 ((*it
).first
.length()+s
.length()+3))/2,' ');
2229 all_names
.push_back(idata
.obj
);
2230 ret
<< std::endl
<< string(150,'-') << std::endl
;
2235 //get the object names and sizes
2236 for(vector
<std::string
>::iterator it
= all_names
.begin(); it
2239 librados::ObjectReadOperation oro
;
2240 librados::AioCompletion
*aioc
= rados
.aio_create_completion();
2241 oro
.omap_get_vals2("", LONG_MAX
, &all_maps
[indexer
], nullptr, &err
);
2242 oro
.getxattr("unwritable", &all_unwrit
[indexer
], &err
);
2243 io_ctx
.aio_operate(*it
, aioc
, &oro
, NULL
);
2244 aioc
->wait_for_safe();
2245 if (aioc
->get_return_value() < 0) {
2246 ret
<< "reading" << *it
<< "failed: " << err
<< std::endl
;
2249 all_sizes
[indexer
] = all_maps
[indexer
].size();
2250 all_versions
[indexer
] = aioc
->get_version64();
2255 ret
<< "///////////////////OBJECT NAMES////////////////" << std::endl
;
2258 for (int i
= 0; i
< indexer
; i
++) {
2259 ret
<< "---------------------------\t";
2262 for (int i
= 0; i
< indexer
; i
++) {
2263 ret
<< "|" << string((25 -
2264 (string("Bucket: ").length() + all_names
[i
].length()))/2, ' ');
2265 ret
<< "Bucket: " << all_names
[i
];
2267 (string("Bucket: ").length() + all_names
[i
].length()))/2, ' ') << "|\t";
2270 for (int i
= 0; i
< indexer
; i
++) {
2271 its
[i
] = all_maps
[i
].begin();
2272 ret
<< "|" << string((25 - (string("size: ").length()
2273 + to_string("",all_sizes
[i
]).length()))/2, ' ');
2274 ret
<< "size: " << all_sizes
[i
];
2275 ret
<< string((25 - (string("size: ").length()
2276 + to_string("",all_sizes
[i
]).length()))/2, ' ') << "|\t";
2279 for (int i
= 0; i
< indexer
; i
++) {
2280 its
[i
] = all_maps
[i
].begin();
2281 ret
<< "|" << string((25 - (string("version: ").length()
2282 + to_string("",all_versions
[i
]).length()))/2, ' ');
2283 ret
<< "version: " << all_versions
[i
];
2284 ret
<< string((25 - (string("version: ").length()
2285 + to_string("",all_versions
[i
]).length()))/2, ' ') << "|\t";
2288 for (int i
= 0; i
< indexer
; i
++) {
2289 its
[i
] = all_maps
[i
].begin();
2290 ret
<< "|" << string((25 - (string("unwritable? ").length()
2292 ret
<< "unwritable? " << string(all_unwrit
[i
].c_str(),
2293 all_unwrit
[i
].length());
2294 ret
<< string((25 - (string("unwritable? ").length()
2295 + 1))/2, ' ') << "|\t";
2298 for (int i
= 0; i
< indexer
; i
++) {
2299 ret
<< "---------------------------\t";
2302 ret
<< "///////////////////THE ACTUAL BLOCKS////////////////" << std::endl
;
2306 for (int i
= 0; i
< indexer
; i
++) {
2307 ret
<< "---------------------------\t";
2310 //each time through this part is two lines
2311 while(done
< keys
.size()) {
2312 for(int i
= 0; i
< indexer
; i
++) {
2316 if (its
[i
] == all_maps
[i
].end()){
2321 ret
<< "|" << string((25 -
2322 ((*its
[i
]).first
.length()+its
[i
]->second
.length()+3))/2,' ');
2323 ret
<< (*its
[i
]).first
;
2325 ret
<< string(its
[i
]->second
.c_str(), its
[i
]->second
.length());
2327 ((*its
[i
]).first
.length()+its
[i
]->second
.length()+3))/2,' ');
2335 for (int i
= 0; i
< indexer
; i
++) {
2339 ret
<< "---------------------------\t";