2 * Key-value store using librados
6 * eleanor.cawthon@inktank.com
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "include/compat.h"
15 #include "key_value_store/key_value_structure.h"
16 #include "key_value_store/kv_flat_btree_async.h"
17 #include "key_value_store/kvs_arg_types.h"
18 #include "include/rados/librados.hpp"
19 #include "common/ceph_context.h"
20 #include "common/Clock.h"
21 #include "include/types.h"
33 using ceph::bufferlist
;
36 bool index_data::is_timed_out(utime_t now
, utime_t timeout
) const {
37 return prefix
!= "" && now
- ts
> timeout
;
40 void IndexCache::clear() {
45 void IndexCache::push(const string
&key
, const index_data
&idata
) {
46 if (cache_size
== 0) {
50 std::map
<key_data
, std::pair
<index_data
, utime_t
> >::iterator old_it
=
51 k2itmap
.lower_bound(key_data(key
));
52 if (old_it
!= k2itmap
.end()) {
53 t2kmap
.erase(old_it
->second
.second
);
54 k2itmap
.erase(old_it
);
56 std::map
<key_data
, std::pair
<index_data
, utime_t
> >::iterator new_it
=
57 k2itmap
.find(idata
.kdata
);
58 if (new_it
!= k2itmap
.end()) {
59 utime_t old_time
= new_it
->second
.second
;
60 t2kmap
.erase(old_time
);
62 utime_t time
= ceph_clock_now();
63 k2itmap
[idata
.kdata
] = std::make_pair(idata
, time
);
64 t2kmap
[time
] = idata
.kdata
;
65 if ((int)k2itmap
.size() > cache_size
) {
71 void IndexCache::push(const index_data
&idata
) {
72 if (cache_size
== 0) {
75 if (k2itmap
.count(idata
.kdata
) > 0) {
76 utime_t old_time
= k2itmap
[idata
.kdata
].second
;
77 t2kmap
.erase(old_time
);
78 k2itmap
.erase(idata
.kdata
);
80 utime_t time
= ceph_clock_now();
81 k2itmap
[idata
.kdata
] = std::make_pair(idata
, time
);
82 t2kmap
[time
] = idata
.kdata
;
83 if ((int)k2itmap
.size() > cache_size
) {
88 void IndexCache::pop() {
89 if (cache_size
== 0) {
92 std::map
<utime_t
, key_data
>::iterator it
= t2kmap
.begin();
93 utime_t time
= it
->first
;
94 key_data kdata
= it
->second
;
99 void IndexCache::erase(key_data kdata
) {
100 if (cache_size
== 0) {
103 if (k2itmap
.count(kdata
) > 0) {
104 utime_t c
= k2itmap
[kdata
].second
;
105 k2itmap
.erase(kdata
);
110 int IndexCache::get(const string
&key
, index_data
*idata
) const {
111 if (cache_size
== 0) {
114 if ((int)k2itmap
.size() == 0) {
117 std::map
<key_data
, std::pair
<index_data
, utime_t
> >::const_iterator it
=
118 k2itmap
.lower_bound(key_data(key
));
119 if (it
== k2itmap
.end() || !(it
->second
.first
.min_kdata
< key_data(key
))) {
122 *idata
= it
->second
.first
;
127 int IndexCache::get(const string
&key
, index_data
*idata
,
128 index_data
*next_idata
) const {
129 if (cache_size
== 0) {
132 std::map
<key_data
, std::pair
<index_data
, utime_t
> >::const_iterator it
=
133 k2itmap
.lower_bound(key_data(key
));
134 if (it
== k2itmap
.end() || ++it
== k2itmap
.end()) {
138 if (!(it
->second
.first
.min_kdata
< key_data(key
))){
139 //stale, should be reread.
142 *idata
= it
->second
.first
;
144 if (it
!= k2itmap
.end()) {
145 *next_idata
= it
->second
.first
;
152 int KvFlatBtreeAsync::nothing() {
156 int KvFlatBtreeAsync::wait() {
157 if (rand() % 10 == 0) {
163 int KvFlatBtreeAsync::suicide() {
164 if (rand() % 10 == 0) {
165 if (verbose
) cout
<< client_name
<< " is suiciding" << std::endl
;
171 int KvFlatBtreeAsync::next(const index_data
&idata
, index_data
* out_data
)
173 if (verbose
) cout
<< "\t\t" << client_name
<< "-next: finding next of "
177 librados::ObjectReadOperation oro
;
178 std::map
<std::string
, bufferlist
> kvs
;
179 oro
.omap_get_vals2(idata
.kdata
.encoded(),1,&kvs
, nullptr, &err
);
180 err
= io_ctx
.operate(index_name
, &oro
, NULL
);
182 if (verbose
) cout
<< "\t\t\t" << client_name
183 << "-next: getting index failed with error "
188 out_data
->kdata
.parse(kvs
.begin()->first
);
189 auto b
= kvs
.begin()->second
.cbegin();
191 if (idata
.is_timed_out(ceph_clock_now(), timeout
)) {
192 if (verbose
) cout
<< client_name
<< " THINKS THE OTHER CLIENT DIED."
194 //the client died after deleting the object. clean up.
203 int KvFlatBtreeAsync::prev(const index_data
&idata
, index_data
* out_data
)
205 if (verbose
) cout
<< "\t\t" << client_name
<< "-prev: finding prev of "
206 << idata
.str() << std::endl
;
209 idata_from_idata_args in_args
;
210 in_args
.idata
= idata
;
211 in_args
.encode(inbl
);
213 err
= io_ctx
.exec(index_name
,"kvs", "get_prev_idata", inbl
, outbl
);
215 if (verbose
) cout
<< "\t\t\t" << client_name
216 << "-prev: getting index failed with error "
218 if (idata
.is_timed_out(ceph_clock_now(), timeout
)) {
219 if (verbose
) cout
<< client_name
<< " THINKS THE OTHER CLIENT DIED."
221 //the client died after deleting the object. clean up.
222 err
= cleanup(idata
, err
);
223 if (err
== -ESUICIDE
) {
231 auto it
= outbl
.cbegin();
233 *out_data
= in_args
.next_idata
;
234 if (verbose
) cout
<< "\t\t" << client_name
<< "-prev: prev is "
240 int KvFlatBtreeAsync::read_index(const string
&key
, index_data
* idata
,
241 index_data
* next_idata
, bool force_update
) {
244 if (verbose
) cout
<< "\t" << client_name
245 << "-read_index: getting index_data for " << key
246 << " from cache" << std::endl
;
248 if (next_idata
!= NULL
) {
249 err
= icache
.get(key
, idata
, next_idata
);
251 err
= icache
.get(key
, idata
);
253 icache_lock
.unlock();
256 //if (verbose) cout << "CACHE SUCCESS" << std::endl;
259 if (verbose
) cout
<< "NOT IN CACHE" << std::endl
;
263 if (verbose
) cout
<< "\t" << client_name
264 << "-read_index: getting index_data for " << key
265 << " from object" << std::endl
;
266 librados::ObjectReadOperation oro
;
268 std::set
<std::string
> key_set
;
269 key_set
.insert(key_data(key
).encoded());
270 std::map
<std::string
, bufferlist
> kvmap
;
271 std::map
<std::string
, bufferlist
> dupmap
;
272 oro
.omap_get_vals_by_keys(key_set
, &dupmap
, &err
);
273 oro
.omap_get_vals2(key_data(key
).encoded(),
274 (cache_size
/ cache_refresh
>= 2? cache_size
/ cache_refresh
: 2),
275 &kvmap
, nullptr, &err
);
276 err
= io_ctx
.operate(index_name
, &oro
, NULL
);
277 utime_t mytime
= ceph_clock_now();
279 cerr
<< "\t" << client_name
280 << "-read_index: getting keys failed with "
282 ceph_abort_msg(client_name
+ "-read_index: reading index failed");
285 kvmap
.insert(dupmap
.begin(), dupmap
.end());
286 for (map
<string
, bufferlist
>::iterator it
= ++kvmap
.begin();
289 bufferlist bl
= it
->second
;
290 auto blit
= bl
.cbegin();
291 index_data this_idata
;
292 this_idata
.decode(blit
);
293 if (this_idata
.is_timed_out(mytime
, timeout
)) {
294 if (verbose
) cout
<< client_name
295 << " THINKS THE OTHER CLIENT DIED. (mytime is "
296 << mytime
.sec() << "." << mytime
.usec() << ", idata.ts is "
297 << this_idata
.ts
.sec() << "." << this_idata
.ts
.usec()
298 << ", it has been " << (mytime
- this_idata
.ts
).sec()
299 << '.' << (mytime
- this_idata
.ts
).usec()
300 << ", timeout is " << timeout
<< ")" << std::endl
;
301 //the client died after deleting the object. clean up.
302 if (cleanup(this_idata
, -EPREFIX
) == -ESUICIDE
) {
305 return read_index(key
, idata
, next_idata
, force_update
);
307 std::scoped_lock l
{icache_lock
};
308 icache
.push(this_idata
);
310 auto b
= kvmap
.begin()->second
.cbegin();
312 idata
->kdata
.parse(kvmap
.begin()->first
);
313 if (verbose
) cout
<< "\t" << client_name
<< "-read_index: kvmap_size is "
315 << ", idata is " << idata
->str() << std::endl
;
317 ceph_assert(idata
->obj
!= "");
319 icache
.push(key
, *idata
);
320 icache_lock
.unlock();
322 if (next_idata
!= NULL
&& idata
->kdata
.prefix
!= "1") {
323 next_idata
->kdata
.parse((++kvmap
.begin())->first
);
324 auto nb
= (++kvmap
.begin())->second
.cbegin();
325 next_idata
->decode(nb
);
326 std::scoped_lock l
{icache_lock
};
327 icache
.push(*next_idata
);
332 int KvFlatBtreeAsync::split(const index_data
&idata
) {
336 if (idata
.prefix
!= "") {
341 args
.bound
= 2 * k
- 1;
342 args
.comparator
= CEPH_OSD_CMPXATTR_OP_GT
;
343 err
= read_object(idata
.obj
, &args
);
344 args
.odata
.max_kdata
= idata
.kdata
;
346 if (verbose
) cout
<< "\t\t" << client_name
<< "-split: read object "
348 << " got " << err
<< std::endl
;
352 if (verbose
) cout
<< "\t\t" << client_name
<< "-split: splitting "
354 << ", which has size " << args
.odata
.size
355 << " and actual size " << args
.odata
.omap
.size() << std::endl
;
357 ///////preparations that happen outside the critical section
359 vector
<object_data
> to_create
;
360 vector
<object_data
> to_delete
;
361 to_delete
.push_back(object_data(idata
.min_kdata
,
362 args
.odata
.max_kdata
, args
.odata
.name
, args
.odata
.version
));
364 //for lower half object
365 std::map
<std::string
, bufferlist
>::const_iterator it
= args
.odata
.omap
.begin();
366 client_index_lock
.lock();
367 to_create
.push_back(object_data(to_string(client_name
, client_index
++)));
368 client_index_lock
.unlock();
369 for (int i
= 0; i
< k
; i
++) {
370 to_create
[0].omap
.insert(*it
);
373 to_create
[0].min_kdata
= idata
.min_kdata
;
374 to_create
[0].max_kdata
= key_data(to_create
[0].omap
.rbegin()->first
);
376 //for upper half object
377 client_index_lock
.lock();
378 to_create
.push_back(object_data(to_create
[0].max_kdata
,
379 args
.odata
.max_kdata
,
380 to_string(client_name
, client_index
++)));
381 client_index_lock
.unlock();
382 to_create
[1].omap
.insert(
383 ++args
.odata
.omap
.find(to_create
[0].omap
.rbegin()->first
),
384 args
.odata
.omap
.end());
386 //setting up operations
387 librados::ObjectWriteOperation owos
[6];
388 vector
<std::pair
<std::pair
<int, string
>, librados::ObjectWriteOperation
*> > ops
;
390 set_up_prefix_index(to_create
, to_delete
, &owos
[0], &out_data
, &err
);
391 ops
.push_back(std::make_pair(
392 std::pair
<int, string
>(ADD_PREFIX
, index_name
),
394 for (int i
= 1; i
< 6; i
++) {
395 ops
.push_back(std::make_pair(std::make_pair(0,""), &owos
[i
]));
397 set_up_ops(to_create
, to_delete
, &ops
, out_data
, &err
);
399 /////BEGIN CRITICAL SECTION/////
400 //put prefix on index entry for idata.val
401 err
= perform_ops("\t\t" + client_name
+ "-split:", out_data
, &ops
);
405 if (verbose
) cout
<< "\t\t" << client_name
<< "-split: done splitting."
407 /////END CRITICAL SECTION/////
409 for (vector
<delete_data
>::iterator it
= out_data
.to_delete
.begin();
410 it
!= out_data
.to_delete
.end(); ++it
) {
411 icache
.erase(it
->max
);
413 for (vector
<create_data
>::iterator it
= out_data
.to_create
.begin();
414 it
!= out_data
.to_create
.end(); ++it
) {
415 icache
.push(index_data(*it
));
417 icache_lock
.unlock();
421 int KvFlatBtreeAsync::rebalance(const index_data
&idata1
,
422 const index_data
&next_idata
){
426 if (idata1
.prefix
!= "") {
430 rebalance_args args1
;
432 args1
.comparator
= CEPH_OSD_CMPXATTR_OP_LT
;
433 index_data idata2
= next_idata
;
435 rebalance_args args2
;
437 args2
.comparator
= CEPH_OSD_CMPXATTR_OP_LT
;
439 if (idata1
.kdata
.prefix
== "1") {
440 //this is the highest key in the index, so it doesn't have a next.
442 //read the index for the previous entry
443 err
= prev(idata1
, &idata2
);
444 if (err
== -ERANGE
) {
445 if (verbose
) cout
<< "\t\t" << client_name
446 << "-rebalance: this is the only node, "
447 << "so aborting" << std::endl
;
449 } else if (err
< 0) {
453 //read the first object
454 err
= read_object(idata1
.obj
, &args2
);
456 if (verbose
) cout
<< "reading " << idata1
.obj
<< " failed with " << err
458 if (err
== -ENOENT
) {
463 args2
.odata
.min_kdata
= idata1
.min_kdata
;
464 args2
.odata
.max_kdata
= idata1
.kdata
;
466 //read the second object
467 args1
.bound
= 2 * k
+ 1;
468 err
= read_object(idata2
.obj
, &args1
);
470 if (verbose
) cout
<< "reading " << idata1
.obj
<< " failed with " << err
474 args1
.odata
.min_kdata
= idata2
.min_kdata
;
475 args1
.odata
.max_kdata
= idata2
.kdata
;
477 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: read "
479 << ". size: " << args1
.odata
.size
<< " version: "
480 << args1
.odata
.version
483 assert (next_idata
.obj
!= "");
484 //there is a next key, so get it.
485 err
= read_object(idata1
.obj
, &args1
);
487 if (verbose
) cout
<< "reading " << idata1
.obj
<< " failed with " << err
491 args1
.odata
.min_kdata
= idata1
.min_kdata
;
492 args1
.odata
.max_kdata
= idata1
.kdata
;
494 args2
.bound
= 2 * k
+ 1;
495 err
= read_object(idata2
.obj
, &args2
);
497 if (verbose
) cout
<< "reading " << idata1
.obj
<< " failed with " << err
499 if (err
== -ENOENT
) {
504 args2
.odata
.min_kdata
= idata2
.min_kdata
;
505 args2
.odata
.max_kdata
= idata2
.kdata
;
507 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: read "
509 << ". size: " << args2
.odata
.size
<< " version: "
510 << args2
.odata
.version
514 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: o1 is "
515 << args1
.odata
.max_kdata
.encoded() << ","
516 << args1
.odata
.name
<< " with size " << args1
.odata
.size
517 << " , o2 is " << args2
.odata
.max_kdata
.encoded()
518 << "," << args2
.odata
.name
<< " with size " << args2
.odata
.size
522 if ((int)args1
.odata
.size
> k
&& (int)args1
.odata
.size
<= 2*k
523 && (int)args2
.odata
.size
> k
524 && (int)args2
.odata
.size
<= 2*k
) {
526 if (verbose
) cout
<< "\t\t" << client_name
527 << "-rebalance: both sizes in range, so"
528 << " aborting " << std::endl
;
530 } else if (idata1
.prefix
!= "" || idata2
.prefix
!= "") {
534 //this is the high object. it gets created regardless of rebalance or merge.
535 client_index_lock
.lock();
536 string o2w
= to_string(client_name
, client_index
++);
537 client_index_lock
.unlock();
539 vector
<object_data
> to_create
;
540 vector
<object_data
> to_delete
;
541 librados::ObjectWriteOperation create
[2];//possibly only 1 will be used
542 librados::ObjectWriteOperation other_ops
[6];
543 vector
<std::pair
<std::pair
<int, string
>, librados::ObjectWriteOperation
*> > ops
;
544 ops
.push_back(std::make_pair(
545 std::pair
<int, string
>(ADD_PREFIX
, index_name
),
548 if ((int)args1
.odata
.size
+ (int)args2
.odata
.size
<= 2*k
) {
550 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: merging "
552 << " and " << args2
.odata
.name
<< " to get " << o2w
554 std::map
<string
, bufferlist
> write2_map
;
555 write2_map
.insert(args1
.odata
.omap
.begin(), args1
.odata
.omap
.end());
556 write2_map
.insert(args2
.odata
.omap
.begin(), args2
.odata
.omap
.end());
557 to_create
.push_back(object_data(args1
.odata
.min_kdata
,
558 args2
.odata
.max_kdata
, o2w
, write2_map
));
559 ops
.push_back(std::make_pair(
560 std::pair
<int, std::string
>(MAKE_OBJECT
, o2w
),
562 ceph_assert((int)write2_map
.size() <= 2*k
);
565 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: rebalancing "
567 << " and " << args2
.odata
.name
<< std::endl
;
568 std::map
<std::string
, bufferlist
> write1_map
;
569 std::map
<std::string
, bufferlist
> write2_map
;
570 std::map
<std::string
, bufferlist
>::iterator it
;
571 client_index_lock
.lock();
572 string o1w
= to_string(client_name
, client_index
++);
573 client_index_lock
.unlock();
574 int target_size_1
= ceil(((int)args1
.odata
.size
+ (int)args2
.odata
.size
)
576 if (args1
.odata
.max_kdata
!= idata1
.kdata
) {
577 //this should be true if idata1 is the high object
578 target_size_1
= floor(((int)args1
.odata
.size
+ (int)args2
.odata
.size
)
581 for (it
= args1
.odata
.omap
.begin();
582 it
!= args1
.odata
.omap
.end() && (int)write1_map
.size()
585 write1_map
.insert(*it
);
587 if (it
!= args1
.odata
.omap
.end()){
588 //write1_map is full, so put the rest in write2_map
589 write2_map
.insert(it
, args1
.odata
.omap
.end());
590 write2_map
.insert(args2
.odata
.omap
.begin(), args2
.odata
.omap
.end());
592 //args1.odata.omap was small, and write2_map still needs more
593 std::map
<std::string
, bufferlist
>::iterator it2
;
594 for(it2
= args2
.odata
.omap
.begin();
595 (it2
!= args2
.odata
.omap
.end()) && ((int)write1_map
.size()
598 write1_map
.insert(*it2
);
600 write2_map
.insert(it2
, args2
.odata
.omap
.end());
602 if (verbose
) cout
<< "\t\t" << client_name
603 << "-rebalance: write1_map has size "
604 << write1_map
.size() << ", write2_map.size() is " << write2_map
.size()
606 //at this point, write1_map and write2_map should have the correct pairs
607 to_create
.push_back(object_data(args1
.odata
.min_kdata
,
608 key_data(write1_map
.rbegin()->first
),
610 to_create
.push_back(object_data( key_data(write1_map
.rbegin()->first
),
611 args2
.odata
.max_kdata
, o2w
, write2_map
));
612 ops
.push_back(std::make_pair(
613 std::pair
<int, std::string
>(MAKE_OBJECT
, o1w
),
615 ops
.push_back(std::make_pair(
616 std::pair
<int, std::string
>(MAKE_OBJECT
, o2w
),
620 to_delete
.push_back(object_data(args1
.odata
.min_kdata
,
621 args1
.odata
.max_kdata
, args1
.odata
.name
, args1
.odata
.version
));
622 to_delete
.push_back(object_data(args2
.odata
.min_kdata
,
623 args2
.odata
.max_kdata
, args2
.odata
.name
, args2
.odata
.version
));
624 for (int i
= 1; i
< 6; i
++) {
625 ops
.push_back(std::make_pair(std::make_pair(0,""), &other_ops
[i
]));
629 set_up_prefix_index(to_create
, to_delete
, &other_ops
[0], &out_data
, &err
);
630 set_up_ops(to_create
, to_delete
, &ops
, out_data
, &err
);
632 //at this point, all operations should be completely set up.
633 /////BEGIN CRITICAL SECTION/////
634 err
= perform_ops("\t\t" + client_name
+ "-rebalance:", out_data
, &ops
);
639 for (vector
<delete_data
>::iterator it
= out_data
.to_delete
.begin();
640 it
!= out_data
.to_delete
.end(); ++it
) {
641 icache
.erase(it
->max
);
643 for (vector
<create_data
>::iterator it
= out_data
.to_create
.begin();
644 it
!= out_data
.to_create
.end(); ++it
) {
645 icache
.push(index_data(*it
));
647 icache_lock
.unlock();
648 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: done rebalancing."
650 /////END CRITICAL SECTION/////
654 int KvFlatBtreeAsync::read_object(const string
&obj
, object_data
* odata
) {
655 librados::ObjectReadOperation get_obj
;
656 librados::AioCompletion
* obj_aioc
= rados
.aio_create_completion();
660 get_obj
.omap_get_vals2("", LONG_MAX
, &odata
->omap
, nullptr, &err
);
661 get_obj
.getxattr("unwritable", &unw_bl
, &err
);
662 io_ctx
.aio_operate(obj
, obj_aioc
, &get_obj
, NULL
);
663 obj_aioc
->wait_for_complete();
664 err
= obj_aioc
->get_return_value();
666 //possibly -ENOENT, meaning someone else deleted it.
670 odata
->unwritable
= string(unw_bl
.c_str(), unw_bl
.length()) == "1";
671 odata
->version
= obj_aioc
->get_version64();
672 odata
->size
= odata
->omap
.size();
677 int KvFlatBtreeAsync::read_object(const string
&obj
, rebalance_args
* args
) {
682 librados::AioCompletion
* a
= rados
.aio_create_completion();
683 io_ctx
.aio_exec(obj
, a
, "kvs", "maybe_read_for_balance", inbl
, &outbl
);
684 a
->wait_for_complete();
685 err
= a
->get_return_value();
687 if (verbose
) cout
<< "\t\t" << client_name
688 << "-read_object: reading failed with "
693 auto it
= outbl
.cbegin();
695 args
->odata
.name
= obj
;
696 args
->odata
.version
= a
->get_version64();
701 void KvFlatBtreeAsync::set_up_prefix_index(
702 const vector
<object_data
> &to_create
,
703 const vector
<object_data
> &to_delete
,
704 librados::ObjectWriteOperation
* owo
,
707 std::map
<std::string
, std::pair
<bufferlist
, int> > assertions
;
708 std::map
<string
, bufferlist
> to_insert
;
710 idata
->ts
= ceph_clock_now();
711 for(vector
<object_data
>::const_iterator it
= to_create
.begin();
712 it
!= to_create
.end();
714 create_data
c(it
->min_kdata
, it
->max_kdata
, it
->name
);
715 idata
->to_create
.push_back(c
);
717 for(vector
<object_data
>::const_iterator it
= to_delete
.begin();
718 it
!= to_delete
.end();
720 delete_data
d(it
->min_kdata
, it
->max_kdata
, it
->name
, it
->version
);
721 idata
->to_delete
.push_back(d
);
723 for(vector
<object_data
>::const_iterator it
= to_delete
.begin();
724 it
!= to_delete
.end();
726 idata
->obj
= it
->name
;
727 idata
->min_kdata
= it
->min_kdata
;
728 idata
->kdata
= it
->max_kdata
;
730 idata
->encode(insert
);
731 to_insert
[it
->max_kdata
.encoded()] = insert
;
732 index_data this_entry
;
733 this_entry
.min_kdata
= idata
->min_kdata
;
734 this_entry
.kdata
= idata
->kdata
;
735 this_entry
.obj
= idata
->obj
;
736 assertions
[it
->max_kdata
.encoded()] = std::pair
<bufferlist
, int>
737 (to_bl(this_entry
), CEPH_OSD_CMPXATTR_OP_EQ
);
738 if (verbose
) cout
<< "\t\t\t" << client_name
739 << "-setup_prefix: will assert "
740 << this_entry
.str() << std::endl
;
742 ceph_assert(*err
== 0);
743 owo
->omap_cmp(assertions
, err
);
744 if (to_create
.size() <= 2) {
745 owo
->omap_set(to_insert
);
749 //some args can be null if there are no corresponding entries in p
750 void KvFlatBtreeAsync::set_up_ops(
751 const vector
<object_data
> &create_vector
,
752 const vector
<object_data
> &delete_vector
,
753 vector
<std::pair
<std::pair
<int, string
>, librados::ObjectWriteOperation
*> > * ops
,
754 const index_data
&idata
,
756 vector
<std::pair
<std::pair
<int, string
>,
757 librados::ObjectWriteOperation
* > >::iterator it
;
759 //skip the prefixing part
760 for(it
= ops
->begin(); it
->first
.first
== ADD_PREFIX
; ++it
) {}
761 std::map
<string
, bufferlist
> to_insert
;
762 std::set
<string
> to_remove
;
763 std::map
<string
, std::pair
<bufferlist
, int> > assertions
;
764 if (create_vector
.size() > 0) {
765 for (int i
= 0; i
< (int)idata
.to_delete
.size(); ++i
) {
766 it
->first
= std::pair
<int, string
>(UNWRITE_OBJECT
, idata
.to_delete
[i
].obj
);
767 set_up_unwrite_object(delete_vector
[i
].version
, it
->second
);
771 for (int i
= 0; i
< (int)idata
.to_create
.size(); ++i
) {
772 index_data
this_entry(idata
.to_create
[i
].max
, idata
.to_create
[i
].min
,
773 idata
.to_create
[i
].obj
);
774 to_insert
[idata
.to_create
[i
].max
.encoded()] = to_bl(this_entry
);
775 if (idata
.to_create
.size() <= 2) {
776 it
->first
= std::pair
<int, string
>(MAKE_OBJECT
, idata
.to_create
[i
].obj
);
778 it
->first
= std::pair
<int, string
>(AIO_MAKE_OBJECT
, idata
.to_create
[i
].obj
);
780 set_up_make_object(create_vector
[i
].omap
, it
->second
);
783 for (int i
= 0; i
< (int)idata
.to_delete
.size(); ++i
) {
784 index_data this_entry
= idata
;
785 this_entry
.obj
= idata
.to_delete
[i
].obj
;
786 this_entry
.min_kdata
= idata
.to_delete
[i
].min
;
787 this_entry
.kdata
= idata
.to_delete
[i
].max
;
788 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-setup_ops: will assert "
789 << this_entry
.str() << std::endl
;
790 assertions
[idata
.to_delete
[i
].max
.encoded()] = std::pair
<bufferlist
, int>(
791 to_bl(this_entry
), CEPH_OSD_CMPXATTR_OP_EQ
);
792 to_remove
.insert(idata
.to_delete
[i
].max
.encoded());
793 it
->first
= std::pair
<int, string
>(REMOVE_OBJECT
, idata
.to_delete
[i
].obj
);
794 set_up_delete_object(it
->second
);
797 if ((int)idata
.to_create
.size() <= 2) {
798 it
->second
->omap_cmp(assertions
, err
);
800 it
->second
->omap_rm_keys(to_remove
);
801 it
->second
->omap_set(to_insert
);
804 it
->first
= std::pair
<int, string
>(REMOVE_PREFIX
, index_name
);
807 void KvFlatBtreeAsync::set_up_make_object(
808 const std::map
<std::string
, bufferlist
> &to_set
,
809 librados::ObjectWriteOperation
*owo
) {
811 encode(to_set
, inbl
);
812 owo
->exec("kvs", "create_with_omap", inbl
);
815 void KvFlatBtreeAsync::set_up_unwrite_object(
816 const int &ver
, librados::ObjectWriteOperation
*owo
) {
818 owo
->assert_version(ver
);
820 owo
->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ
, to_bl("0"));
821 owo
->setxattr("unwritable", to_bl("1"));
824 void KvFlatBtreeAsync::set_up_restore_object(
825 librados::ObjectWriteOperation
*owo
) {
826 owo
->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ
, to_bl("1"));
827 owo
->setxattr("unwritable", to_bl("0"));
830 void KvFlatBtreeAsync::set_up_delete_object(
831 librados::ObjectWriteOperation
*owo
) {
832 owo
->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ
, to_bl("1"));
836 int KvFlatBtreeAsync::perform_ops(const string
&debug_prefix
,
837 const index_data
&idata
,
838 vector
<std::pair
<std::pair
<int, string
>, librados::ObjectWriteOperation
*> > *ops
) {
840 vector
<librados::AioCompletion
*> aiocs(idata
.to_create
.size());
842 for (vector
<std::pair
<std::pair
<int, string
>,
843 librados::ObjectWriteOperation
*> >::iterator it
= ops
->begin();
844 it
!= ops
->end(); ++it
) {
845 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
848 switch (it
->first
.first
) {
849 case ADD_PREFIX
://prefixing
850 if (verbose
) cout
<< debug_prefix
<< " adding prefix" << std::endl
;
851 err
= io_ctx
.operate(index_name
, it
->second
);
853 if (verbose
) cout
<< debug_prefix
<< " prefixing the index failed with "
857 if (verbose
) cout
<< debug_prefix
<< " prefix added." << std::endl
;
859 case UNWRITE_OBJECT
://marking
860 if (verbose
) cout
<< debug_prefix
<< " marking " << it
->first
.second
862 err
= io_ctx
.operate(it
->first
.second
, it
->second
);
864 //most likely because it changed, in which case it will be -ERANGE
865 if (verbose
) cout
<< debug_prefix
<< " marking " << it
->first
.second
866 << "failed with code" << err
<< std::endl
;
867 if (it
->first
.second
== (*idata
.to_delete
.begin()).max
.encoded()) {
868 if (cleanup(idata
, -EFIRSTOBJ
) == -ESUICIDE
) {
872 if (cleanup(idata
, -ERANGE
) == -ESUICIDE
) {
878 if (verbose
) cout
<< debug_prefix
<< " marked " << it
->first
.second
881 case MAKE_OBJECT
://creating
882 if (verbose
) cout
<< debug_prefix
<< " creating " << it
->first
.second
884 err
= io_ctx
.operate(it
->first
.second
, it
->second
);
886 //this can happen if someone else was cleaning up after us.
887 if (verbose
) cout
<< debug_prefix
<< " creating " << it
->first
.second
889 << " with code " << err
<< std::endl
;
890 if (err
== -EEXIST
) {
891 //someone thinks we died, so die
892 if (verbose
) cout
<< client_name
<< " is suiciding!" << std::endl
;
899 if (verbose
|| idata
.to_create
.size() > 2) {
900 cout
<< debug_prefix
<< " created object " << it
->first
.second
904 case AIO_MAKE_OBJECT
:
905 cout
<< debug_prefix
<< " launching asynchronous create "
906 << it
->first
.second
<< std::endl
;
907 aiocs
[count
] = rados
.aio_create_completion();
908 io_ctx
.aio_operate(it
->first
.second
, aiocs
[count
], it
->second
);
910 if ((int)idata
.to_create
.size() == count
) {
911 cout
<< "starting aiowrite waiting loop" << std::endl
;
912 for (count
-= 1; count
>= 0; count
--) {
913 aiocs
[count
]->wait_for_complete();
914 err
= aiocs
[count
]->get_return_value();
916 //this can happen if someone else was cleaning up after us.
917 cerr
<< debug_prefix
<< " a create failed"
918 << " with code " << err
<< std::endl
;
919 if (err
== -EEXIST
) {
920 //someone thinks we died, so die
921 cerr
<< client_name
<< " is suiciding!" << std::endl
;
928 if (verbose
|| idata
.to_create
.size() > 2) {
929 cout
<< debug_prefix
<< " completed aio " << aiocs
.size() - count
930 << "/" << aiocs
.size() << std::endl
;
935 case REMOVE_OBJECT
://deleting
936 if (verbose
) cout
<< debug_prefix
<< " deleting " << it
->first
.second
938 err
= io_ctx
.operate(it
->first
.second
, it
->second
);
940 //if someone else called cleanup on this prefix first
941 if (verbose
) cout
<< debug_prefix
<< " deleting " << it
->first
.second
942 << "failed with code" << err
<< std::endl
;
944 if (verbose
) cout
<< debug_prefix
<< " deleted " << it
->first
.second
947 case REMOVE_PREFIX
://rewriting index
948 if (verbose
) cout
<< debug_prefix
<< " updating index " << std::endl
;
949 err
= io_ctx
.operate(index_name
, it
->second
);
951 if (verbose
) cout
<< debug_prefix
952 << " rewriting the index failed with code " << err
953 << ". someone else must have thought we died, so dying" << std::endl
;
956 if (verbose
) cout
<< debug_prefix
<< " updated index." << std::endl
;
959 if (verbose
) cout
<< debug_prefix
<< " restoring " << it
->first
.second
961 err
= io_ctx
.operate(it
->first
.second
, it
->second
);
963 if (verbose
) cout
<< debug_prefix
<< "restoring " << it
->first
.second
965 << " with " << err
<< std::endl
;
968 if (verbose
) cout
<< debug_prefix
<< " restored " << it
->first
.second
972 if (verbose
) cout
<< debug_prefix
<< " performing unknown op on "
975 err
= io_ctx
.operate(index_name
, it
->second
);
977 if (verbose
) cout
<< debug_prefix
<< " unknown op on "
979 << " failed with " << err
<< std::endl
;
982 if (verbose
) cout
<< debug_prefix
<< " unknown op on "
984 << " succeeded." << std::endl
;
992 int KvFlatBtreeAsync::cleanup(const index_data
&idata
, const int &error
) {
993 if (verbose
) cout
<< "\t\t" << client_name
<< ": cleaning up after "
997 ceph_assert(idata
.prefix
!= "");
998 std::map
<std::string
,bufferlist
> new_index
;
999 std::map
<std::string
, std::pair
<bufferlist
, int> > assertions
;
1002 //this happens if the split or rebalance failed to mark the first object,
1003 //meaning only the index needs to be changed.
1004 //restore objects that had been marked unwritable.
1005 for(vector
<delete_data
>::const_iterator it
=
1006 idata
.to_delete
.begin();
1007 it
!= idata
.to_delete
.end(); ++it
) {
1008 index_data this_entry
;
1009 this_entry
.obj
= (*it
).obj
;
1010 this_entry
.min_kdata
= it
->min
;
1011 this_entry
.kdata
= it
->max
;
1012 new_index
[it
->max
.encoded()] = to_bl(this_entry
);
1014 this_entry
.obj
= it
->obj
;
1015 this_entry
.min_kdata
= it
->min
;
1016 this_entry
.kdata
= it
->max
;
1017 if (verbose
) cout
<< "\t\t\t" << client_name
1018 << "-cleanup: will assert index contains "
1019 << this_entry
.str() << std::endl
;
1020 assertions
[it
->max
.encoded()] =
1021 std::pair
<bufferlist
, int>(to_bl(this_entry
),
1022 CEPH_OSD_CMPXATTR_OP_EQ
);
1026 librados::ObjectWriteOperation update_index
;
1027 update_index
.omap_cmp(assertions
, &err
);
1028 update_index
.omap_set(new_index
);
1029 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: updating index"
1031 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1034 err
= io_ctx
.operate(index_name
, &update_index
);
1036 if (verbose
) cout
<< "\t\t\t" << client_name
1037 << "-cleanup: rewriting failed with "
1038 << err
<< ". returning -ECANCELED" << std::endl
;
1041 if (verbose
) cout
<< "\t\t\t" << client_name
1042 << "-cleanup: updated index. cleanup done."
1047 //this happens if a split or rebalance fails to mark an object. It is a
1048 //special case of rolling back that does not have to deal with new objects.
1050 //restore objects that had been marked unwritable.
1051 vector
<delete_data
>::const_iterator it
;
1052 for(it
= idata
.to_delete
.begin();
1053 it
!= idata
.to_delete
.end(); ++it
) {
1054 index_data this_entry
;
1055 this_entry
.obj
= (*it
).obj
;
1056 this_entry
.min_kdata
= it
->min
;
1057 this_entry
.kdata
= it
->max
;
1058 new_index
[it
->max
.encoded()] = to_bl(this_entry
);
1060 this_entry
.obj
= it
->obj
;
1061 this_entry
.min_kdata
= it
->min
;
1062 this_entry
.kdata
= it
->max
;
1063 if (verbose
) cout
<< "\t\t\t" << client_name
1064 << "-cleanup: will assert index contains "
1065 << this_entry
.str() << std::endl
;
1066 assertions
[it
->max
.encoded()] =
1067 std::pair
<bufferlist
, int>(to_bl(this_entry
),
1068 CEPH_OSD_CMPXATTR_OP_EQ
);
1070 it
= idata
.to_delete
.begin();
1071 librados::ObjectWriteOperation restore
;
1072 set_up_restore_object(&restore
);
1073 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1076 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restoring "
1079 err
= io_ctx
.operate(it
->obj
, &restore
);
1081 //i.e., -ECANCELED because the object was already restored by someone
1083 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restoring "
1085 << " failed with " << err
<< std::endl
;
1087 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restored "
1093 librados::ObjectWriteOperation update_index
;
1094 update_index
.omap_cmp(assertions
, &err
);
1095 update_index
.omap_set(new_index
);
1096 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: updating index"
1098 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1101 err
= io_ctx
.operate(index_name
, &update_index
);
1103 if (verbose
) cout
<< "\t\t\t" << client_name
1104 << "-cleanup: rewriting failed with "
1105 << err
<< ". returning -ECANCELED" << std::endl
;
1108 if (verbose
) cout
<< "\t\t\t" << client_name
1109 << "-cleanup: updated index. cleanup done."
1114 if (verbose
) cout
<< "\t\t" << client_name
<< "-cleanup: rolling forward"
1116 //all changes were created except for updating the index and possibly
1117 //deleting the objects. roll forward.
1118 vector
<std::pair
<std::pair
<int, string
>, librados::ObjectWriteOperation
*> > ops
;
1119 vector
<librados::ObjectWriteOperation
> owos(idata
.to_delete
.size() + 1);
1120 for (int i
= 0; i
<= (int)idata
.to_delete
.size(); ++i
) {
1121 ops
.push_back(std::make_pair(std::pair
<int, std::string
>(0, ""), &owos
[i
]));
1123 set_up_ops(vector
<object_data
>(),
1124 vector
<object_data
>(), &ops
, idata
, &err
);
1125 err
= perform_ops("\t\t" + client_name
+ "-cleanup:", idata
, &ops
);
1127 if (err
== -ESUICIDE
) {
1130 if (verbose
) cout
<< "\t\t\t" << client_name
1131 << "-cleanup: rewriting failed with "
1132 << err
<< ". returning -ECANCELED" << std::endl
;
1135 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: updated index"
1140 //roll back all changes.
1141 if (verbose
) cout
<< "\t\t" << client_name
<< "-cleanup: rolling back"
1143 std::map
<std::string
,bufferlist
> new_index
;
1144 std::set
<std::string
> to_remove
;
1145 std::map
<std::string
, std::pair
<bufferlist
, int> > assertions
;
1147 //mark the objects to be created. if someone else already has, die.
1148 for(vector
<create_data
>::const_reverse_iterator it
=
1149 idata
.to_create
.rbegin();
1150 it
!= idata
.to_create
.rend(); ++it
) {
1151 librados::ObjectWriteOperation rm
;
1152 set_up_unwrite_object(0, &rm
);
1153 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 )
1157 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: marking "
1160 err
= io_ctx
.operate(it
->obj
, &rm
);
1162 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: marking "
1164 << " failed with " << err
<< std::endl
;
1166 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: marked "
1172 //restore objects that had been marked unwritable.
1173 for(vector
<delete_data
>::const_iterator it
=
1174 idata
.to_delete
.begin();
1175 it
!= idata
.to_delete
.end(); ++it
) {
1176 index_data this_entry
;
1177 this_entry
.obj
= (*it
).obj
;
1178 this_entry
.min_kdata
= it
->min
;
1179 this_entry
.kdata
= it
->max
;
1180 new_index
[it
->max
.encoded()] = to_bl(this_entry
);
1182 this_entry
.obj
= it
->obj
;
1183 this_entry
.min_kdata
= it
->min
;
1184 this_entry
.kdata
= it
->max
;
1185 if (verbose
) cout
<< "\t\t\t" << client_name
1186 << "-cleanup: will assert index contains "
1187 << this_entry
.str() << std::endl
;
1188 assertions
[it
->max
.encoded()] =
1189 std::pair
<bufferlist
, int>(to_bl(this_entry
),
1190 CEPH_OSD_CMPXATTR_OP_EQ
);
1191 librados::ObjectWriteOperation restore
;
1192 set_up_restore_object(&restore
);
1193 if (verbose
) cout
<< "\t\t\t" << client_name
1194 << "-cleanup: will assert index contains "
1195 << this_entry
.str() << std::endl
;
1196 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 )
1200 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restoring "
1203 err
= io_ctx
.operate(it
->obj
, &restore
);
1204 if (err
== -ENOENT
) {
1205 //it had gotten far enough to be rolled forward - unmark the objects
1207 if (verbose
) cout
<< "\t\t\t" << client_name
1208 << "-cleanup: roll forward instead"
1210 for(vector
<create_data
>::const_iterator cit
=
1211 idata
.to_create
.begin();
1212 cit
!= idata
.to_create
.end(); ++cit
) {
1213 librados::ObjectWriteOperation res
;
1214 set_up_restore_object(&res
);
1215 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)()
1219 if (verbose
) cout
<< "\t\t\t" << client_name
1220 << "-cleanup: restoring " << cit
->obj
1222 err
= io_ctx
.operate(cit
->obj
, &res
);
1224 if (verbose
) cout
<< "\t\t\t" << client_name
1225 << "-cleanup: restoring "
1226 << cit
->obj
<< " failed with " << err
<< std::endl
;
1228 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restored "
1232 return cleanup(idata
, -ENOENT
);
1233 } else if (err
< 0) {
1234 //i.e., -ECANCELED because the object was already restored by someone
1236 if (verbose
) cout
<< "\t\t\t" << client_name
1237 << "-cleanup: restoring " << it
->obj
1238 << " failed with " << err
<< std::endl
;
1240 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restored "
1246 //remove the new objects
1247 for(vector
<create_data
>::const_reverse_iterator it
=
1248 idata
.to_create
.rbegin();
1249 it
!= idata
.to_create
.rend(); ++it
) {
1250 to_remove
.insert(it
->max
.encoded());
1251 librados::ObjectWriteOperation rm
;
1253 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 )
1257 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: removing "
1260 err
= io_ctx
.operate(it
->obj
, &rm
);
1262 if (verbose
) cout
<< "\t\t\t" << client_name
1263 << "-cleanup: failed to remove "
1264 << it
->obj
<< std::endl
;
1266 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: removed "
1273 librados::ObjectWriteOperation update_index
;
1274 update_index
.omap_cmp(assertions
, &err
);
1275 update_index
.omap_rm_keys(to_remove
);
1276 update_index
.omap_set(new_index
);
1277 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: updating index"
1279 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1282 err
= io_ctx
.operate(index_name
, &update_index
);
1284 if (verbose
) cout
<< "\t\t\t" << client_name
1285 << "-cleanup: rewriting failed with "
1286 << err
<< ". returning -ECANCELED" << std::endl
;
1289 if (verbose
) cout
<< "\t\t\t" << client_name
1290 << "-cleanup: updated index. cleanup done."
1298 string
KvFlatBtreeAsync::to_string(string s
, int i
) {
1304 string
KvFlatBtreeAsync::get_name() {
1308 void KvFlatBtreeAsync::set_inject(injection_t inject
, int wait_time
) {
1310 wait_ms
= wait_time
;
1313 int KvFlatBtreeAsync::setup(int argc
, const char** argv
) {
1314 int r
= rados
.init(rados_id
.c_str());
1316 cerr
<< "error during init" << r
<< std::endl
;
1319 r
= rados
.conf_parse_argv(argc
, argv
);
1321 cerr
<< "error during parsing args" << r
<< std::endl
;
1324 r
= rados
.conf_parse_env(NULL
);
1326 cerr
<< "error during parsing env" << r
<< std::endl
;
1329 r
= rados
.conf_read_file(NULL
);
1331 cerr
<< "error during read file: " << r
<< std::endl
;
1334 r
= rados
.connect();
1336 cerr
<< "error during connect: " << r
<< std::endl
;
1339 r
= rados
.ioctx_create(pool_name
.c_str(), io_ctx
);
1341 cerr
<< "error creating io ctx: " << r
<< std::endl
;
1346 librados::ObjectWriteOperation make_index
;
1347 make_index
.create(true);
1348 std::map
<std::string
,bufferlist
> index_map
;
1350 idata
.obj
= client_name
;
1351 idata
.min_kdata
.raw_key
= "";
1352 idata
.kdata
= key_data("");
1353 index_map
["1"] = to_bl(idata
);
1354 make_index
.omap_set(index_map
);
1355 r
= io_ctx
.operate(index_name
, &make_index
);
1357 if (verbose
) cout
<< client_name
<< ": Making the index failed with code "
1362 if (verbose
) cout
<< client_name
<< ": created index object" << std::endl
;
1364 librados::ObjectWriteOperation make_max_obj
;
1365 make_max_obj
.create(true);
1366 make_max_obj
.setxattr("unwritable", to_bl("0"));
1367 make_max_obj
.setxattr("size", to_bl("0"));
1368 r
= io_ctx
.operate(client_name
, &make_max_obj
);
1370 if (verbose
) cout
<< client_name
<< ": Setting xattr failed with code "
1378 int KvFlatBtreeAsync::set(const string
&key
, const bufferlist
&val
,
1379 bool update_on_existing
) {
1380 if (verbose
) cout
<< client_name
<< " is "
1381 << (update_on_existing
? "updating " : "setting ")
1382 << key
<< std::endl
;
1385 index_data
idata(key
);
1387 if (verbose
) cout
<< "\t" << client_name
<< ": finding oid" << std::endl
;
1388 err
= read_index(key
, &idata
, NULL
, false);
1390 if (verbose
) cout
<< "\t" << client_name
1391 << ": getting oid failed with code "
1392 << err
<< std::endl
;
1395 if (verbose
) cout
<< "\t" << client_name
<< ": index data is " << idata
.str()
1396 << ", object is " << idata
.obj
<< std::endl
;
1398 err
= set_op(key
, val
, update_on_existing
, idata
);
1400 if (verbose
) cout
<< "\t" << client_name
<< ": finished set with " << err
1405 int KvFlatBtreeAsync::set_op(const string
&key
, const bufferlist
&val
,
1406 bool update_on_existing
, index_data
&idata
) {
1412 args
.exclusive
= !update_on_existing
;
1413 args
.omap
[key
] = val
;
1416 librados::ObjectWriteOperation owo
;
1417 owo
.exec("kvs", "omap_insert", inbl
);
1418 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1419 if (verbose
) cout
<< client_name
<< " IS SUICIDING!" << std::endl
;
1422 if (verbose
) cout
<< "\t" << client_name
<< ": inserting " << key
1424 << idata
.obj
<< std::endl
;
1425 int err
= io_ctx
.operate(idata
.obj
, &owo
);
1429 //the key already exists and this is an exclusive insert.
1430 cerr
<< "\t" << client_name
<< ": writing key failed with "
1431 << err
<< std::endl
;
1434 case -EKEYREJECTED
: {
1435 //the object needs to be split.
1437 if (verbose
) cout
<< "\t" << client_name
<< ": running split on "
1440 err
= read_index(key
, &idata
, NULL
, true);
1442 if (verbose
) cout
<< "\t" << client_name
1443 << ": getting oid failed with code "
1444 << err
<< std::endl
;
1448 if (err
< 0 && err
!= -ENOENT
&& err
!= -EBALANCE
) {
1449 if (verbose
) cerr
<< "\t" << client_name
<< ": split failed with "
1450 << err
<< std::endl
;
1451 int ret
= handle_set_rm_errors(err
, idata
.obj
, key
, &idata
, NULL
);
1454 if (verbose
) cout
<< client_name
<< " IS SUICIDING!" << std::endl
;
1457 return set_op(key
, val
, update_on_existing
, idata
);
1462 } while (err
< 0 && err
!= -EBALANCE
&& err
!= -ENOENT
);
1463 err
= read_index(key
, &idata
, NULL
, true);
1465 if (verbose
) cout
<< "\t" << client_name
1466 << ": getting oid failed with code "
1467 << err
<< std::endl
;
1470 return set_op(key
, val
, update_on_existing
, idata
);
1473 if (verbose
) cerr
<< "\t" << client_name
<< ": writing obj failed with "
1474 << err
<< std::endl
;
1475 if (err
== -ENOENT
|| err
== -EACCES
) {
1476 if (err
== -ENOENT
) {
1477 if (verbose
) cout
<< "CACHE FAILURE" << std::endl
;
1479 err
= read_index(key
, &idata
, NULL
, true);
1481 if (verbose
) cout
<< "\t" << client_name
1482 << ": getting oid failed with code "
1483 << err
<< std::endl
;
1486 if (verbose
) cout
<< "\t" << client_name
<< ": index data is "
1488 << ", object is " << idata
.obj
<< std::endl
;
1489 return set_op(key
, val
, update_on_existing
, idata
);
1498 int KvFlatBtreeAsync::remove(const string
&key
) {
1499 if (verbose
) cout
<< client_name
<< ": removing " << key
<< std::endl
;
1504 index_data next_idata
;
1506 if (verbose
) cout
<< "\t" << client_name
<< ": finding oid" << std::endl
;
1507 err
= read_index(key
, &idata
, &next_idata
, false);
1509 if (verbose
) cout
<< "getting oid failed with code " << err
<< std::endl
;
1513 if (verbose
) cout
<< "\t" << client_name
<< ": idata is " << idata
.str()
1514 << ", next_idata is " << next_idata
.str()
1515 << ", obj is " << obj
<< std::endl
;
1517 err
= remove_op(key
, idata
, next_idata
);
1519 if (verbose
) cout
<< "\t" << client_name
<< ": finished remove with " << err
1520 << " and exiting" << std::endl
;
1524 int KvFlatBtreeAsync::remove_op(const string
&key
, index_data
&idata
,
1525 index_data
&next_idata
) {
1530 args
.omap
.insert(key
);
1533 librados::ObjectWriteOperation owo
;
1534 owo
.exec("kvs", "omap_remove", inbl
);
1535 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1536 if (verbose
) cout
<< client_name
<< " IS SUICIDING!" << std::endl
;
1539 if (verbose
) cout
<< "\t" << client_name
<< ": removing " << key
<< " from "
1542 int err
= io_ctx
.operate(idata
.obj
, &owo
);
1544 if (verbose
) cout
<< "\t" << client_name
<< ": writing obj failed with "
1545 << err
<< std::endl
;
1548 //the key does not exist in the object
1551 case -EKEYREJECTED
: {
1552 //the object needs to be split.
1554 if (verbose
) cerr
<< "\t" << client_name
<< ": running rebalance on "
1555 << idata
.obj
<< std::endl
;
1556 err
= read_index(key
, &idata
, &next_idata
, true);
1558 if (verbose
) cout
<< "\t" << client_name
1559 << ": getting oid failed with code "
1560 << err
<< std::endl
;
1563 err
= rebalance(idata
, next_idata
);
1564 if (err
< 0 && err
!= -ENOENT
&& err
!= -EBALANCE
) {
1565 if (verbose
) cerr
<< "\t" << client_name
<< ": rebalance returned "
1566 << err
<< std::endl
;
1567 int ret
= handle_set_rm_errors(err
, idata
.obj
, key
, &idata
,
1571 if (verbose
) cout
<< client_name
<< " IS SUICIDING!" << std::endl
;
1574 return remove_op(key
, idata
, next_idata
);
1579 //this is the only node, so it's ok to go below k.
1580 librados::ObjectWriteOperation owo
;
1584 args
.omap
.insert(key
);
1586 owo
.exec("kvs", "omap_remove", inbl
);
1587 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)()
1589 if (verbose
) cout
<< client_name
<< " IS SUICIDING!"
1593 if (verbose
) cout
<< "\t" << client_name
<< ": removing " << key
1597 int err
= io_ctx
.operate(idata
.obj
, &owo
);
1603 } while (err
< 0 && err
!= -EBALANCE
&& err
!= -ENOENT
);
1604 err
= read_index(key
, &idata
, &next_idata
, true);
1606 if (verbose
) cout
<< "\t" << client_name
1607 << ": getting oid failed with code "
1608 << err
<< std::endl
;
1614 if (err
== -ENOENT
|| err
== -EACCES
) {
1615 err
= read_index(key
, &idata
, &next_idata
, true);
1617 if (verbose
) cout
<< "\t" << client_name
1618 << ": getting oid failed with code "
1619 << err
<< std::endl
;
1622 if (verbose
) cout
<< "\t" << client_name
<< ": index data is "
1624 << ", object is " << idata
.obj
<< std::endl
;
1625 //idea: we read the time every time we read the index anyway - store it.
1626 return remove_op(key
, idata
, next_idata
);
1635 int KvFlatBtreeAsync::handle_set_rm_errors(int &err
, string obj
,
1637 index_data
* idata
, index_data
* next_idata
) {
1638 if (err
== -ESUICIDE
) {
1640 } else if (err
== -ECANCELED
//if an object was unwritable or index changed
1641 || err
== -EPREFIX
//if there is currently a prefix
1642 || err
== -ETIMEDOUT
// if the index changes during the op - i.e. cleanup
1643 || err
== -EACCES
) //possible if we were acting on old index data
1645 err
= read_index(key
, idata
, next_idata
, true);
1649 if (verbose
) cout
<< "\t" << client_name
<< ": prefix is " << idata
->str()
1651 if (idata
->obj
!= obj
) {
1652 //someone else has split or cleaned up or something. start over.
1653 return 1;//meaning repeat
1655 } else if (err
!= -ETIMEDOUT
&& err
!= -ERANGE
&& err
!= -EACCES
1656 && err
!= -EUCLEAN
){
1657 if (verbose
) cout
<< "\t" << client_name
1658 << ": split encountered an unexpected error: " << err
1665 int KvFlatBtreeAsync::get(const string
&key
, bufferlist
*val
) {
1667 if (verbose
) cout
<< client_name
<< ": getting " << key
<< std::endl
;
1672 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1675 err
= read_index(key
, &idata
, NULL
, false);
1676 mytime
= ceph_clock_now();
1678 if (verbose
) cout
<< "getting oid failed with code " << err
<< std::endl
;
1682 err
= get_op(key
, val
, idata
);
1684 if (verbose
) cout
<< client_name
<< ": got " << key
<< " with " << err
1690 int KvFlatBtreeAsync::get_op(const string
&key
, bufferlist
*val
,
1691 index_data
&idata
) {
1693 std::set
<std::string
> key_set
;
1694 key_set
.insert(key
);
1695 std::map
<std::string
,bufferlist
> omap
;
1696 librados::ObjectReadOperation read
;
1697 read
.omap_get_vals_by_keys(key_set
, &omap
, &err
);
1698 err
= io_ctx
.operate(idata
.obj
, &read
, NULL
);
1700 if (err
== -ENOENT
) {
1701 err
= read_index(key
, &idata
, NULL
, true);
1703 if (verbose
) cout
<< "\t" << client_name
1704 << ": getting oid failed with code "
1705 << err
<< std::endl
;
1708 if (verbose
) cout
<< "\t" << client_name
<< ": index data is "
1710 << ", object is " << idata
.obj
<< std::endl
;
1711 return get_op(key
, val
, idata
);
1713 if (verbose
) cout
<< client_name
1714 << ": get encountered an unexpected error: " << err
1724 void *KvFlatBtreeAsync::pset(void *ptr
) {
1725 struct aio_set_args
*args
= (struct aio_set_args
*)ptr
;
1727 args
->kvba
->KvFlatBtreeAsync::set((string
)args
->key
,
1728 (bufferlist
)args
->val
, (bool)args
->exc
);
1729 args
->cb(args
->err
, args
->cb_args
);
1734 void KvFlatBtreeAsync::aio_set(const string
&key
, const bufferlist
&val
,
1735 bool exclusive
, callback cb
, void * cb_args
, int * err
) {
1736 aio_set_args
*args
= new aio_set_args();
1740 args
->exc
= exclusive
;
1742 args
->cb_args
= cb_args
;
1745 int r
= pthread_create(&t
, NULL
, pset
, (void*)args
);
1753 void *KvFlatBtreeAsync::prm(void *ptr
) {
1754 struct aio_rm_args
*args
= (struct aio_rm_args
*)ptr
;
1756 args
->kvba
->KvFlatBtreeAsync::remove((string
)args
->key
);
1757 args
->cb(args
->err
, args
->cb_args
);
1762 void KvFlatBtreeAsync::aio_remove(const string
&key
,
1763 callback cb
, void * cb_args
, int * err
) {
1764 aio_rm_args
* args
= new aio_rm_args();
1768 args
->cb_args
= cb_args
;
1771 int r
= pthread_create(&t
, NULL
, prm
, (void*)args
);
1779 void *KvFlatBtreeAsync::pget(void *ptr
) {
1780 struct aio_get_args
*args
= (struct aio_get_args
*)ptr
;
1782 args
->kvba
->KvFlatBtreeAsync::get((string
)args
->key
,
1783 (bufferlist
*)args
->val
);
1784 args
->cb(args
->err
, args
->cb_args
);
1789 void KvFlatBtreeAsync::aio_get(const string
&key
, bufferlist
*val
,
1790 callback cb
, void * cb_args
, int * err
) {
1791 aio_get_args
* args
= new aio_get_args();
1796 args
->cb_args
= cb_args
;
1799 int r
= pthread_create(&t
, NULL
, pget
, (void*)args
);
1807 int KvFlatBtreeAsync::set_many(const std::map
<string
, bufferlist
> &in_map
) {
1811 std::set
<string
> keys
;
1813 std::map
<string
, bufferlist
> big_map
;
1814 for (map
<string
, bufferlist
>::const_iterator it
= in_map
.begin();
1815 it
!= in_map
.end(); ++it
) {
1816 keys
.insert(it
->first
);
1817 big_map
.insert(*it
);
1820 if (verbose
) cout
<< "created key set and big_map" << std::endl
;
1823 librados::AioCompletion
* aioc
= rados
.aio_create_completion();
1824 io_ctx
.aio_exec(index_name
, aioc
, "kvs", "read_many", inbl
, &outbl
);
1825 aioc
->wait_for_complete();
1826 err
= aioc
->get_return_value();
1829 cerr
<< "getting index failed with " << err
<< std::endl
;
1833 std::map
<string
, bufferlist
> imap
;//read from the index
1834 auto blit
= outbl
.cbegin();
1837 if (verbose
) cout
<< "finished reading index for objects. there are "
1838 << imap
.size() << " entries that need to be changed. " << std::endl
;
1841 vector
<object_data
> to_delete
;
1843 vector
<object_data
> to_create
;
1845 if (verbose
) cout
<< "setting up to_delete and to_create vectors from index "
1846 << "map" << std::endl
;
1847 //set up to_delete from index map
1848 for (map
<string
, bufferlist
>::iterator it
= imap
.begin(); it
!= imap
.end();
1851 blit
= it
->second
.begin();
1853 to_delete
.push_back(object_data(idata
.min_kdata
, idata
.kdata
, idata
.obj
));
1854 err
= read_object(idata
.obj
, &to_delete
[to_delete
.size() - 1]);
1856 if (verbose
) cout
<< "reading " << idata
.obj
<< " failed with " << err
1858 return set_many(in_map
);
1861 big_map
.insert(to_delete
[to_delete
.size() - 1].omap
.begin(),
1862 to_delete
[to_delete
.size() - 1].omap
.end());
1865 to_create
.push_back(object_data(
1866 to_string(client_name
, client_index
++)));
1867 to_create
[0].min_kdata
= to_delete
[0].min_kdata
;
1869 for(map
<string
, bufferlist
>::iterator it
= big_map
.begin();
1870 it
!= big_map
.end(); ++it
) {
1871 if (to_create
[to_create
.size() - 1].omap
.size() == 1.5 * k
) {
1872 to_create
[to_create
.size() - 1].max_kdata
=
1873 key_data(to_create
[to_create
.size() - 1]
1874 .omap
.rbegin()->first
);
1876 to_create
.push_back(object_data(
1877 to_string(client_name
, client_index
++)));
1878 to_create
[to_create
.size() - 1].min_kdata
=
1879 to_create
[to_create
.size() - 2].max_kdata
;
1882 to_create
[to_create
.size() - 1].omap
.insert(*it
);
1884 to_create
[to_create
.size() - 1].max_kdata
=
1885 to_delete
[to_delete
.size() - 1].max_kdata
;
1887 vector
<librados::ObjectWriteOperation
> owos(2 + 2 * to_delete
.size()
1888 + to_create
.size());
1889 vector
<std::pair
<std::pair
<int, string
>, librados::ObjectWriteOperation
*> > ops
;
1893 set_up_prefix_index(to_create
, to_delete
, &owos
[0], &idata
, &err
);
1895 if (verbose
) cout
<< "finished making to_create and to_delete. "
1898 ops
.push_back(std::make_pair(
1899 std::pair
<int, string
>(ADD_PREFIX
, index_name
),
1901 for (int i
= 1; i
< 2 + 2 * (int)to_delete
.size() + (int)to_create
.size();
1903 ops
.push_back(std::make_pair(std::make_pair(0,""), &owos
[i
]));
1906 set_up_ops(to_create
, to_delete
, &ops
, idata
, &err
);
1908 cout
<< "finished setting up ops. Starting critical section..." << std::endl
;
1910 /////BEGIN CRITICAL SECTION/////
1911 //put prefix on index entry for idata.val
1912 err
= perform_ops("\t\t" + client_name
+ "-set_many:", idata
, &ops
);
1914 return set_many(in_map
);
1916 if (verbose
) cout
<< "\t\t" << client_name
<< "-split: done splitting."
1918 /////END CRITICAL SECTION/////
1919 std::scoped_lock l
{icache_lock
};
1920 for (vector
<delete_data
>::iterator it
= idata
.to_delete
.begin();
1921 it
!= idata
.to_delete
.end(); ++it
) {
1922 icache
.erase(it
->max
);
1924 for (vector
<create_data
>::iterator it
= idata
.to_create
.begin();
1925 it
!= idata
.to_create
.end(); ++it
) {
1926 icache
.push(index_data(*it
));
1931 int KvFlatBtreeAsync::remove_all() {
1932 if (verbose
) cout
<< client_name
<< ": removing all" << std::endl
;
1934 librados::ObjectReadOperation oro
;
1935 librados::AioCompletion
* oro_aioc
= rados
.aio_create_completion();
1936 std::map
<std::string
, bufferlist
> index_set
;
1937 oro
.omap_get_vals2("",LONG_MAX
,&index_set
, nullptr, &err
);
1938 err
= io_ctx
.aio_operate(index_name
, oro_aioc
, &oro
, NULL
);
1940 if (err
== -ENOENT
) {
1943 if (verbose
) cout
<< "getting keys failed with error " << err
<< std::endl
;
1946 oro_aioc
->wait_for_complete();
1947 oro_aioc
->release();
1949 librados::ObjectWriteOperation rm_index
;
1950 librados::AioCompletion
* rm_index_aioc
= rados
.aio_create_completion();
1951 std::map
<std::string
,bufferlist
> new_index
;
1952 new_index
["1"] = index_set
["1"];
1953 rm_index
.omap_clear();
1954 rm_index
.omap_set(new_index
);
1955 io_ctx
.aio_operate(index_name
, rm_index_aioc
, &rm_index
);
1956 err
= rm_index_aioc
->get_return_value();
1957 rm_index_aioc
->release();
1959 if (verbose
) cout
<< "rm index aioc failed with " << err
1964 if (!index_set
.empty()) {
1965 for (std::map
<std::string
,bufferlist
>::iterator it
= index_set
.begin();
1966 it
!= index_set
.end(); ++it
){
1967 librados::ObjectWriteOperation sub
;
1968 if (it
->first
== "1") {
1974 auto b
= it
->second
.cbegin();
1976 io_ctx
.operate(idata
.obj
, &sub
);
1985 int KvFlatBtreeAsync::get_all_keys(std::set
<std::string
> *keys
) {
1986 if (verbose
) cout
<< client_name
<< ": getting all keys" << std::endl
;
1988 librados::ObjectReadOperation oro
;
1989 std::map
<std::string
,bufferlist
> index_set
;
1990 oro
.omap_get_vals2("",LONG_MAX
,&index_set
, nullptr, &err
);
1991 io_ctx
.operate(index_name
, &oro
, NULL
);
1993 if (verbose
) cout
<< "getting keys failed with error " << err
<< std::endl
;
1996 for (std::map
<std::string
,bufferlist
>::iterator it
= index_set
.begin();
1997 it
!= index_set
.end(); ++it
){
1998 librados::ObjectReadOperation sub
;
1999 std::set
<std::string
> ret
;
2000 sub
.omap_get_keys2("",LONG_MAX
,&ret
, nullptr, &err
);
2002 auto b
= it
->second
.cbegin();
2004 io_ctx
.operate(idata
.obj
, &sub
, NULL
);
2005 keys
->insert(ret
.begin(), ret
.end());
2010 int KvFlatBtreeAsync::get_all_keys_and_values(
2011 std::map
<std::string
,bufferlist
> *kv_map
) {
2012 if (verbose
) cout
<< client_name
<< ": getting all keys and values"
2015 librados::ObjectReadOperation first_read
;
2016 std::set
<std::string
> index_set
;
2017 first_read
.omap_get_keys2("",LONG_MAX
,&index_set
, nullptr, &err
);
2018 io_ctx
.operate(index_name
, &first_read
, NULL
);
2020 if (verbose
) cout
<< "getting keys failed with error " << err
<< std::endl
;
2023 for (std::set
<std::string
>::iterator it
= index_set
.begin();
2024 it
!= index_set
.end(); ++it
){
2025 librados::ObjectReadOperation sub
;
2026 std::map
<std::string
, bufferlist
> ret
;
2027 sub
.omap_get_vals2("",LONG_MAX
,&ret
, nullptr, &err
);
2028 io_ctx
.operate(*it
, &sub
, NULL
);
2029 kv_map
->insert(ret
.begin(), ret
.end());
2034 bool KvFlatBtreeAsync::is_consistent() {
2037 if (verbose
) cout
<< client_name
<< ": checking consistency" << std::endl
;
2038 std::map
<std::string
,bufferlist
> index
;
2039 std::map
<std::string
, std::set
<std::string
> > sub_objs
;
2040 librados::ObjectReadOperation oro
;
2041 oro
.omap_get_vals2("",LONG_MAX
,&index
, nullptr, &err
);
2042 io_ctx
.operate(index_name
, &oro
, NULL
);
2044 //probably because the index doesn't exist - this might be ok.
2045 for (librados::NObjectIterator oit
= io_ctx
.nobjects_begin();
2046 oit
!= io_ctx
.nobjects_end(); ++oit
) {
2047 //if this executes, there are floating objects.
2048 cerr
<< "Not consistent! found floating object " << oit
->get_oid()
2055 std::map
<std::string
, string
> parsed_index
;
2056 std::set
<std::string
> onames
;
2057 std::set
<std::string
> special_names
;
2058 for (map
<std::string
,bufferlist
>::iterator it
= index
.begin();
2059 it
!= index
.end(); ++it
) {
2060 if (it
->first
!= "") {
2062 auto b
= it
->second
.cbegin();
2064 if (idata
.prefix
!= "") {
2065 for(vector
<delete_data
>::iterator dit
= idata
.to_delete
.begin();
2066 dit
!= idata
.to_delete
.end(); ++dit
) {
2067 librados::ObjectReadOperation oro
;
2068 librados::AioCompletion
* aioc
= rados
.aio_create_completion();
2070 oro
.getxattr("unwritable", &un
, &err
);
2071 io_ctx
.aio_operate(dit
->obj
, aioc
, &oro
, NULL
);
2072 aioc
->wait_for_complete();
2073 err
= aioc
->get_return_value();
2074 if (ceph_clock_now() - idata
.ts
> timeout
) {
2077 if (err
== -ENOENT
) {
2080 cerr
<< "Not consistent! reading object " << dit
->obj
2081 << "returned " << err
<< std::endl
;
2086 if (atoi(string(un
.c_str(), un
.length()).c_str()) != 1 &&
2087 aioc
->get_version64() != dit
->version
) {
2088 cerr
<< "Not consistent! object " << dit
->obj
<< " has been "
2089 << " modified since the client died was not cleaned up."
2094 special_names
.insert(dit
->obj
);
2097 for(vector
<create_data
>::iterator cit
= idata
.to_create
.begin();
2098 cit
!= idata
.to_create
.end(); ++cit
) {
2099 special_names
.insert(cit
->obj
);
2102 parsed_index
.insert(std::make_pair(it
->first
, idata
.obj
));
2103 onames
.insert(idata
.obj
);
2107 //make sure that an object exists iff it either is the index
2108 //or is listed in the index
2109 for (librados::NObjectIterator oit
= io_ctx
.nobjects_begin();
2110 oit
!= io_ctx
.nobjects_end(); ++oit
) {
2111 string name
= oit
->get_oid();
2112 if (name
!= index_name
&& onames
.count(name
) == 0
2113 && special_names
.count(name
) == 0) {
2114 cerr
<< "Not consistent! found floating object " << name
<< std::endl
;
2121 for (std::map
<std::string
, string
>::iterator it
= parsed_index
.begin();
2122 it
!= parsed_index
.end();
2124 librados::ObjectReadOperation read
;
2125 read
.omap_get_keys2("", LONG_MAX
, &sub_objs
[it
->second
], nullptr, &err
);
2126 err
= io_ctx
.operate(it
->second
, &read
, NULL
);
2127 int size_int
= (int)sub_objs
[it
->second
].size();
2129 //check that size is in the right range
2130 if (it
->first
!= "1" && special_names
.count(it
->second
) == 0 &&
2131 err
!= -ENOENT
&& (size_int
> 2*k
|| size_int
< k
)
2132 && parsed_index
.size() > 1) {
2133 cerr
<< "Not consistent! Object " << *it
<< " has size " << size_int
2134 << ", which is outside the acceptable range." << std::endl
;
2138 //check that all keys belong in that object
2139 for(std::set
<std::string
>::iterator subit
= sub_objs
[it
->second
].begin();
2140 subit
!= sub_objs
[it
->second
].end(); ++subit
) {
2141 if ((it
->first
!= "1"
2142 && *subit
> it
->first
.substr(1,it
->first
.length()))
2143 || *subit
<= prev
) {
2144 cerr
<< "Not consistent! key " << *subit
<< " does not belong in "
2145 << *it
<< std::endl
;
2146 cerr
<< "not last element, i.e. " << it
->first
<< " not equal to 1? "
2147 << (it
->first
!= "1") << std::endl
2148 << "greater than " << it
->first
.substr(1,it
->first
.length())
2149 <<"? " << (*subit
> it
->first
.substr(1,it
->first
.length()))
2151 << "less than or equal to " << prev
<< "? "
2152 << (*subit
<= prev
) << std::endl
;
2157 prev
= it
->first
.substr(1,it
->first
.length());
2161 if (verbose
) cout
<< "failed consistency test - see error log"
2165 if (verbose
) cout
<< "passed consistency test" << std::endl
;
2170 string
KvFlatBtreeAsync::str() {
2172 ret
<< "Top-level map:" << std::endl
;
2174 std::set
<std::string
> keys
;
2175 std::map
<std::string
,bufferlist
> index
;
2176 librados::ObjectReadOperation oro
;
2177 librados::AioCompletion
* top_aioc
= rados
.aio_create_completion();
2178 oro
.omap_get_vals2("",LONG_MAX
,&index
, nullptr, &err
);
2179 io_ctx
.aio_operate(index_name
, top_aioc
, &oro
, NULL
);
2180 top_aioc
->wait_for_complete();
2181 err
= top_aioc
->get_return_value();
2182 top_aioc
->release();
2183 if (err
< 0 && err
!= -5){
2184 if (verbose
) cout
<< "getting keys failed with error " << err
<< std::endl
;
2188 ret
<< "There are no objects!" << std::endl
;
2192 for (map
<std::string
,bufferlist
>::iterator it
= index
.begin();
2193 it
!= index
.end(); ++it
) {
2194 keys
.insert(string(it
->second
.c_str(), it
->second
.length())
2195 .substr(1,it
->second
.length()));
2198 vector
<std::string
> all_names
;
2199 vector
<int> all_sizes(index
.size());
2200 vector
<int> all_versions(index
.size());
2201 vector
<bufferlist
> all_unwrit(index
.size());
2202 vector
<map
<std::string
,bufferlist
> > all_maps(keys
.size());
2203 vector
<map
<std::string
,bufferlist
>::iterator
> its(keys
.size());
2205 vector
<bool> dones(keys
.size());
2206 ret
<< std::endl
<< string(150,'-') << std::endl
;
2208 for (map
<std::string
,bufferlist
>::iterator it
= index
.begin();
2209 it
!= index
.end(); ++it
){
2211 auto b
= it
->second
.cbegin();
2213 string s
= idata
.str();
2214 ret
<< "|" << string((148 -
2215 ((*it
).first
.length()+s
.length()+3))/2,' ');
2218 ret
<< string(idata
.str());
2219 ret
<< string((148 -
2220 ((*it
).first
.length()+s
.length()+3))/2,' ');
2222 all_names
.push_back(idata
.obj
);
2223 ret
<< std::endl
<< string(150,'-') << std::endl
;
2228 //get the object names and sizes
2229 for(vector
<std::string
>::iterator it
= all_names
.begin(); it
2232 librados::ObjectReadOperation oro
;
2233 librados::AioCompletion
*aioc
= rados
.aio_create_completion();
2234 oro
.omap_get_vals2("", LONG_MAX
, &all_maps
[indexer
], nullptr, &err
);
2235 oro
.getxattr("unwritable", &all_unwrit
[indexer
], &err
);
2236 io_ctx
.aio_operate(*it
, aioc
, &oro
, NULL
);
2237 aioc
->wait_for_complete();
2238 if (aioc
->get_return_value() < 0) {
2239 ret
<< "reading" << *it
<< "failed: " << err
<< std::endl
;
2242 all_sizes
[indexer
] = all_maps
[indexer
].size();
2243 all_versions
[indexer
] = aioc
->get_version64();
2248 ret
<< "///////////////////OBJECT NAMES////////////////" << std::endl
;
2251 for (int i
= 0; i
< indexer
; i
++) {
2252 ret
<< "---------------------------\t";
2255 for (int i
= 0; i
< indexer
; i
++) {
2256 ret
<< "|" << string((25 -
2257 (string("Bucket: ").length() + all_names
[i
].length()))/2, ' ');
2258 ret
<< "Bucket: " << all_names
[i
];
2260 (string("Bucket: ").length() + all_names
[i
].length()))/2, ' ') << "|\t";
2263 for (int i
= 0; i
< indexer
; i
++) {
2264 its
[i
] = all_maps
[i
].begin();
2265 ret
<< "|" << string((25 - (string("size: ").length()
2266 + to_string("",all_sizes
[i
]).length()))/2, ' ');
2267 ret
<< "size: " << all_sizes
[i
];
2268 ret
<< string((25 - (string("size: ").length()
2269 + to_string("",all_sizes
[i
]).length()))/2, ' ') << "|\t";
2272 for (int i
= 0; i
< indexer
; i
++) {
2273 its
[i
] = all_maps
[i
].begin();
2274 ret
<< "|" << string((25 - (string("version: ").length()
2275 + to_string("",all_versions
[i
]).length()))/2, ' ');
2276 ret
<< "version: " << all_versions
[i
];
2277 ret
<< string((25 - (string("version: ").length()
2278 + to_string("",all_versions
[i
]).length()))/2, ' ') << "|\t";
2281 for (int i
= 0; i
< indexer
; i
++) {
2282 its
[i
] = all_maps
[i
].begin();
2283 ret
<< "|" << string((25 - (string("unwritable? ").length()
2285 ret
<< "unwritable? " << string(all_unwrit
[i
].c_str(),
2286 all_unwrit
[i
].length());
2287 ret
<< string((25 - (string("unwritable? ").length()
2288 + 1))/2, ' ') << "|\t";
2291 for (int i
= 0; i
< indexer
; i
++) {
2292 ret
<< "---------------------------\t";
2295 ret
<< "///////////////////THE ACTUAL BLOCKS////////////////" << std::endl
;
2299 for (int i
= 0; i
< indexer
; i
++) {
2300 ret
<< "---------------------------\t";
2303 //each time through this part is two lines
2304 while(done
< keys
.size()) {
2305 for(int i
= 0; i
< indexer
; i
++) {
2309 if (its
[i
] == all_maps
[i
].end()){
2314 ret
<< "|" << string((25 -
2315 ((*its
[i
]).first
.length()+its
[i
]->second
.length()+3))/2,' ');
2316 ret
<< (*its
[i
]).first
;
2318 ret
<< string(its
[i
]->second
.c_str(), its
[i
]->second
.length());
2320 ((*its
[i
]).first
.length()+its
[i
]->second
.length()+3))/2,' ');
2328 for (int i
= 0; i
< indexer
; i
++) {
2332 ret
<< "---------------------------\t";